1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2005
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
24 #include "RtsSignals.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
34 #include "Proftimer.h"
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
47 #include "Capability.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
69 // Turn off inlining when debugging - it obfuscates things
72 # define STATIC_INLINE static
76 #define USED_WHEN_THREADED_RTS
77 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
79 #define USED_WHEN_THREADED_RTS STG_UNUSED
80 #define USED_WHEN_NON_THREADED_RTS
86 #define USED_WHEN_SMP STG_UNUSED
89 /* -----------------------------------------------------------------------------
91 * -------------------------------------------------------------------------- */
95 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
96 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
99 In GranSim we have a runnable and a blocked queue for each processor.
100 In order to minimise code changes new arrays run_queue_hds/tls
101 are created. run_queue_hd is then a short cut (macro) for
102 run_queue_hds[CurrentProc] (see GranSim.h).
105 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
106 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
107 StgTSO *ccalling_threadss[MAX_PROC];
108 /* We use the same global list of threads (all_threads) in GranSim as in
109 the std RTS (i.e. we are cheating). However, we don't use this list in
110 the GranSim specific code at the moment (so we are only potentially
115 #if !defined(THREADED_RTS)
116 // Blocked/sleeping thrads
117 StgTSO *blocked_queue_hd = NULL;
118 StgTSO *blocked_queue_tl = NULL;
119 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
122 /* Threads blocked on blackholes.
123 * LOCK: sched_mutex+capability, or all capabilities
125 StgTSO *blackhole_queue = NULL;
128 /* The blackhole_queue should be checked for threads to wake up. See
129 * Schedule.h for more thorough comment.
130 * LOCK: none (doesn't matter if we miss an update)
132 rtsBool blackholes_need_checking = rtsFalse;
134 /* Linked list of all threads.
135 * Used for detecting garbage collected threads.
136 * LOCK: sched_mutex+capability, or all capabilities
138 StgTSO *all_threads = NULL;
140 /* flag set by signal handler to precipitate a context switch
141 * LOCK: none (just an advisory flag)
143 int context_switch = 0;
145 /* flag that tracks whether we have done any execution in this time slice.
146 * LOCK: currently none, perhaps we should lock (but needs to be
147 * updated in the fast path of the scheduler).
149 nat recent_activity = ACTIVITY_YES;
151 /* if this flag is set as well, give up execution
152 * LOCK: none (changes once, from false->true)
154 rtsBool interrupted = rtsFalse;
156 /* Next thread ID to allocate.
159 static StgThreadID next_thread_id = 1;
161 /* The smallest stack size that makes any sense is:
162 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
163 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
164 * + 1 (the closure to enter)
166 * + 1 (spare slot req'd by stg_ap_v_ret)
168 * A thread with this stack will bomb immediately with a stack
169 * overflow, which will increase its stack size.
171 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
177 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
178 * exists - earlier gccs apparently didn't.
184 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
185 * in an MT setting, needed to signal that a worker thread shouldn't hang around
186 * in the scheduler when it is out of work.
188 rtsBool shutting_down_scheduler = rtsFalse;
191 * This mutex protects most of the global scheduler data in
192 * the THREADED_RTS and (inc. SMP) runtime.
194 #if defined(THREADED_RTS)
198 #if defined(PARALLEL_HASKELL)
200 rtsTime TimeOfLastYield;
201 rtsBool emitSchedule = rtsTrue;
204 /* -----------------------------------------------------------------------------
205 * static function prototypes
206 * -------------------------------------------------------------------------- */
208 static Capability *schedule (Capability *initialCapability, Task *task);
211 // These function all encapsulate parts of the scheduler loop, and are
212 // abstracted only to make the structure and control flow of the
213 // scheduler clearer.
215 static void schedulePreLoop (void);
217 static void schedulePushWork(Capability *cap, Task *task);
219 static void scheduleStartSignalHandlers (Capability *cap);
220 static void scheduleCheckBlockedThreads (Capability *cap);
221 static void scheduleCheckBlackHoles (Capability *cap);
222 static void scheduleDetectDeadlock (Capability *cap, Task *task);
224 static StgTSO *scheduleProcessEvent(rtsEvent *event);
226 #if defined(PARALLEL_HASKELL)
227 static StgTSO *scheduleSendPendingMessages(void);
228 static void scheduleActivateSpark(void);
229 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
231 #if defined(PAR) || defined(GRAN)
232 static void scheduleGranParReport(void);
234 static void schedulePostRunThread(void);
235 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
236 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
238 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
239 nat prev_what_next );
240 static void scheduleHandleThreadBlocked( StgTSO *t );
241 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
243 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
244 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
246 static void unblockThread(Capability *cap, StgTSO *tso);
247 static rtsBool checkBlackHoles(Capability *cap);
248 static void AllRoots(evac_fn evac);
250 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
252 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
253 rtsBool stop_at_atomically);
255 static void deleteThread (Capability *cap, StgTSO *tso);
256 static void deleteRunQueue (Capability *cap);
259 static void printThreadBlockage(StgTSO *tso);
260 static void printThreadStatus(StgTSO *tso);
261 void printThreadQueue(StgTSO *tso);
264 #if defined(PARALLEL_HASKELL)
265 StgTSO * createSparkThread(rtsSpark spark);
266 StgTSO * activateSpark (rtsSpark spark);
270 static char *whatNext_strs[] = {
280 /* -----------------------------------------------------------------------------
281 * Putting a thread on the run queue: different scheduling policies
282 * -------------------------------------------------------------------------- */
285 addToRunQueue( Capability *cap, StgTSO *t )
287 #if defined(PARALLEL_HASKELL)
288 if (RtsFlags.ParFlags.doFairScheduling) {
289 // this does round-robin scheduling; good for concurrency
290 appendToRunQueue(cap,t);
292 // this does unfair scheduling; good for parallelism
293 pushOnRunQueue(cap,t);
296 // this does round-robin scheduling; good for concurrency
297 appendToRunQueue(cap,t);
301 /* ---------------------------------------------------------------------------
302 Main scheduling loop.
304 We use round-robin scheduling, each thread returning to the
305 scheduler loop when one of these conditions is detected:
308 * timer expires (thread yields)
314 In a GranSim setup this loop iterates over the global event queue.
315 This revolves around the global event queue, which determines what
316 to do next. Therefore, it's more complicated than either the
317 concurrent or the parallel (GUM) setup.
320 GUM iterates over incoming messages.
321 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
322 and sends out a fish whenever it has nothing to do; in-between
323 doing the actual reductions (shared code below) it processes the
324 incoming messages and deals with delayed operations
325 (see PendingFetches).
326 This is not the ugliest code you could imagine, but it's bloody close.
328 ------------------------------------------------------------------------ */
331 schedule (Capability *initialCapability, Task *task)
335 StgThreadReturnCode ret;
338 #elif defined(PARALLEL_HASKELL)
341 rtsBool receivedFinish = rtsFalse;
343 nat tp_size, sp_size; // stats only
348 #if defined(THREADED_RTS)
349 rtsBool first = rtsTrue;
352 cap = initialCapability;
354 // Pre-condition: this task owns initialCapability.
355 // The sched_mutex is *NOT* held
356 // NB. on return, we still hold a capability.
359 sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
360 task, initialCapability);
365 // -----------------------------------------------------------
366 // Scheduler loop starts here:
368 #if defined(PARALLEL_HASKELL)
369 #define TERMINATION_CONDITION (!receivedFinish)
371 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
373 #define TERMINATION_CONDITION rtsTrue
376 while (TERMINATION_CONDITION) {
379 /* Choose the processor with the next event */
380 CurrentProc = event->proc;
381 CurrentTSO = event->tso;
384 #if defined(THREADED_RTS)
386 // don't yield the first time, we want a chance to run this
387 // thread for a bit, even if there are others banging at the
390 ASSERT_CAPABILITY_INVARIANTS(cap,task);
392 // Yield the capability to higher-priority tasks if necessary.
393 yieldCapability(&cap, task);
398 schedulePushWork(cap,task);
401 // Check whether we have re-entered the RTS from Haskell without
402 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
404 if (cap->in_haskell) {
405 errorBelch("schedule: re-entered unsafely.\n"
406 " Perhaps a 'foreign import unsafe' should be 'safe'?");
407 stg_exit(EXIT_FAILURE);
411 // Test for interruption. If interrupted==rtsTrue, then either
412 // we received a keyboard interrupt (^C), or the scheduler is
413 // trying to shut down all the tasks (shutting_down_scheduler) in
418 if (shutting_down_scheduler) {
419 IF_DEBUG(scheduler, sched_belch("shutting down"));
420 // If we are a worker, just exit. If we're a bound thread
421 // then we will exit below when we've removed our TSO from
423 if (task->tso == NULL && emptyRunQueue(cap)) {
427 IF_DEBUG(scheduler, sched_belch("interrupted"));
431 #if defined(not_yet) && defined(SMP)
433 // Top up the run queue from our spark pool. We try to make the
434 // number of threads in the run queue equal to the number of
435 // free capabilities.
439 if (emptyRunQueue()) {
440 spark = findSpark(rtsFalse);
442 break; /* no more sparks in the pool */
444 createSparkThread(spark);
446 sched_belch("==^^ turning spark of closure %p into a thread",
447 (StgClosure *)spark));
453 scheduleStartSignalHandlers(cap);
455 // Only check the black holes here if we've nothing else to do.
456 // During normal execution, the black hole list only gets checked
457 // at GC time, to avoid repeatedly traversing this possibly long
458 // list each time around the scheduler.
459 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
461 scheduleCheckBlockedThreads(cap);
463 scheduleDetectDeadlock(cap,task);
465 // Normally, the only way we can get here with no threads to
466 // run is if a keyboard interrupt received during
467 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
468 // Additionally, it is not fatal for the
469 // threaded RTS to reach here with no threads to run.
471 // win32: might be here due to awaitEvent() being abandoned
472 // as a result of a console event having been delivered.
473 if ( emptyRunQueue(cap) ) {
474 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
477 continue; // nothing to do
480 #if defined(PARALLEL_HASKELL)
481 scheduleSendPendingMessages();
482 if (emptyRunQueue(cap) && scheduleActivateSpark())
486 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
489 /* If we still have no work we need to send a FISH to get a spark
491 if (emptyRunQueue(cap)) {
492 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
493 ASSERT(rtsFalse); // should not happen at the moment
495 // from here: non-empty run queue.
496 // TODO: merge above case with this, only one call processMessages() !
497 if (PacketsWaiting()) { /* process incoming messages, if
498 any pending... only in else
499 because getRemoteWork waits for
501 receivedFinish = processMessages();
506 scheduleProcessEvent(event);
510 // Get a thread to run
512 t = popRunQueue(cap);
514 #if defined(GRAN) || defined(PAR)
515 scheduleGranParReport(); // some kind of debuging output
517 // Sanity check the thread we're about to run. This can be
518 // expensive if there is lots of thread switching going on...
519 IF_DEBUG(sanity,checkTSO(t));
522 #if defined(THREADED_RTS)
523 // Check whether we can run this thread in the current task.
524 // If not, we have to pass our capability to the right task.
526 Task *bound = t->bound;
531 sched_belch("### Running thread %d in bound thread",
533 // yes, the Haskell thread is bound to the current native thread
536 sched_belch("### thread %d bound to another OS thread",
538 // no, bound to a different Haskell thread: pass to that thread
539 pushOnRunQueue(cap,t);
543 // The thread we want to run is unbound.
546 sched_belch("### this OS thread cannot run thread %d", t->id));
547 // no, the current native thread is bound to a different
548 // Haskell thread, so pass it to any worker thread
549 pushOnRunQueue(cap,t);
556 cap->r.rCurrentTSO = t;
558 /* context switches are initiated by the timer signal, unless
559 * the user specified "context switch as often as possible", with
562 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
563 && !emptyThreadQueues(cap)) {
569 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
570 (long)t->id, whatNext_strs[t->what_next]));
572 #if defined(PROFILING)
573 startHeapProfTimer();
576 // ----------------------------------------------------------------------
577 // Run the current thread
579 prev_what_next = t->what_next;
581 errno = t->saved_errno;
582 cap->in_haskell = rtsTrue;
584 recent_activity = ACTIVITY_YES;
586 switch (prev_what_next) {
590 /* Thread already finished, return to scheduler. */
591 ret = ThreadFinished;
597 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
598 cap = regTableToCapability(r);
603 case ThreadInterpret:
604 cap = interpretBCO(cap);
609 barf("schedule: invalid what_next field");
612 cap->in_haskell = rtsFalse;
614 // The TSO might have moved, eg. if it re-entered the RTS and a GC
615 // happened. So find the new location:
616 t = cap->r.rCurrentTSO;
619 // If ret is ThreadBlocked, and this Task is bound to the TSO that
620 // blocked, we are in limbo - the TSO is now owned by whatever it
621 // is blocked on, and may in fact already have been woken up,
622 // perhaps even on a different Capability. It may be the case
623 // that task->cap != cap. We better yield this Capability
624 // immediately and return to normaility.
625 if (ret == ThreadBlocked) {
627 debugBelch("--<< thread %d (%s) stopped: blocked\n",
628 t->id, whatNext_strs[t->what_next]));
633 ASSERT_CAPABILITY_INVARIANTS(cap,task);
635 // And save the current errno in this thread.
636 t->saved_errno = errno;
638 // ----------------------------------------------------------------------
640 // Costs for the scheduler are assigned to CCS_SYSTEM
641 #if defined(PROFILING)
646 // We have run some Haskell code: there might be blackhole-blocked
647 // threads to wake up now.
648 // Lock-free test here should be ok, we're just setting a flag.
649 if ( blackhole_queue != END_TSO_QUEUE ) {
650 blackholes_need_checking = rtsTrue;
653 #if defined(THREADED_RTS)
654 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
655 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
656 IF_DEBUG(scheduler,debugBelch("sched: "););
659 schedulePostRunThread();
661 ready_to_gc = rtsFalse;
665 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
669 scheduleHandleStackOverflow(cap,task,t);
673 if (scheduleHandleYield(cap, t, prev_what_next)) {
674 // shortcut for switching between compiler/interpreter:
680 scheduleHandleThreadBlocked(t);
684 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
685 ASSERT_CAPABILITY_INVARIANTS(cap,task);
689 barf("schedule: invalid thread return code %d", (int)ret);
692 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
693 if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
694 } /* end of while() */
696 IF_PAR_DEBUG(verbose,
697 debugBelch("== Leaving schedule() after having received Finish\n"));
700 /* ----------------------------------------------------------------------------
701 * Setting up the scheduler loop
702 * ------------------------------------------------------------------------- */
705 schedulePreLoop(void)
708 /* set up first event to get things going */
709 /* ToDo: assign costs for system setup and init MainTSO ! */
710 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
712 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
715 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n",
717 G_TSO(CurrentTSO, 5));
719 if (RtsFlags.GranFlags.Light) {
720 /* Save current time; GranSim Light only */
721 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
726 /* -----------------------------------------------------------------------------
729 * Push work to other Capabilities if we have some.
730 * -------------------------------------------------------------------------- */
734 schedulePushWork(Capability *cap USED_WHEN_SMP,
735 Task *task USED_WHEN_SMP)
737 Capability *free_caps[n_capabilities], *cap0;
740 // Check whether we have more threads on our run queue that we
741 // could hand to another Capability.
742 if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
746 // First grab as many free Capabilities as we can.
747 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
748 cap0 = &capabilities[i];
749 if (cap != cap0 && tryGrabCapability(cap0,task)) {
750 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
751 // it already has some work, we just grabbed it at
752 // the wrong moment. Or maybe it's deadlocked!
753 releaseCapability(cap0);
755 free_caps[n_free_caps++] = cap0;
760 // we now have n_free_caps free capabilities stashed in
761 // free_caps[]. Share our run queue equally with them. This is
762 // probably the simplest thing we could do; improvements we might
763 // want to do include:
765 // - giving high priority to moving relatively new threads, on
766 // the gournds that they haven't had time to build up a
767 // working set in the cache on this CPU/Capability.
769 // - giving low priority to moving long-lived threads
771 if (n_free_caps > 0) {
772 StgTSO *prev, *t, *next;
773 IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
775 prev = cap->run_queue_hd;
777 prev->link = END_TSO_QUEUE;
779 for (; t != END_TSO_QUEUE; t = next) {
781 t->link = END_TSO_QUEUE;
782 if (t->what_next == ThreadRelocated
783 || t->bound == task) { // don't move my bound thread
786 } else if (i == n_free_caps) {
792 appendToRunQueue(free_caps[i],t);
793 if (t->bound) { t->bound->cap = free_caps[i]; }
797 cap->run_queue_tl = prev;
799 // release the capabilities
800 for (i = 0; i < n_free_caps; i++) {
801 task->cap = free_caps[i];
802 releaseCapability(free_caps[i]);
805 task->cap = cap; // reset to point to our Capability.
809 /* ----------------------------------------------------------------------------
810 * Start any pending signal handlers
811 * ------------------------------------------------------------------------- */
814 scheduleStartSignalHandlers(Capability *cap)
816 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
817 if (signals_pending()) { // safe outside the lock
818 startSignalHandlers(cap);
823 /* ----------------------------------------------------------------------------
824 * Check for blocked threads that can be woken up.
825 * ------------------------------------------------------------------------- */
828 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
830 #if !defined(THREADED_RTS)
832 // Check whether any waiting threads need to be woken up. If the
833 // run queue is empty, and there are no other tasks running, we
834 // can wait indefinitely for something to happen.
836 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
838 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
844 /* ----------------------------------------------------------------------------
845 * Check for threads blocked on BLACKHOLEs that can be woken up
846 * ------------------------------------------------------------------------- */
848 scheduleCheckBlackHoles (Capability *cap)
850 if ( blackholes_need_checking ) // check without the lock first
852 ACQUIRE_LOCK(&sched_mutex);
853 if ( blackholes_need_checking ) {
854 checkBlackHoles(cap);
855 blackholes_need_checking = rtsFalse;
857 RELEASE_LOCK(&sched_mutex);
861 /* ----------------------------------------------------------------------------
862 * Detect deadlock conditions and attempt to resolve them.
863 * ------------------------------------------------------------------------- */
866 scheduleDetectDeadlock (Capability *cap, Task *task)
869 #if defined(PARALLEL_HASKELL)
870 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
875 * Detect deadlock: when we have no threads to run, there are no
876 * threads blocked, waiting for I/O, or sleeping, and all the
877 * other tasks are waiting for work, we must have a deadlock of
880 if ( emptyThreadQueues(cap) )
882 #if defined(THREADED_RTS)
884 * In the threaded RTS, we only check for deadlock if there
885 * has been no activity in a complete timeslice. This means
886 * we won't eagerly start a full GC just because we don't have
887 * any threads to run currently.
889 if (recent_activity != ACTIVITY_INACTIVE) return;
892 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
894 // Garbage collection can release some new threads due to
895 // either (a) finalizers or (b) threads resurrected because
896 // they are unreachable and will therefore be sent an
897 // exception. Any threads thus released will be immediately
899 scheduleDoGC( cap, task, rtsTrue/*force major GC*/ );
900 recent_activity = ACTIVITY_DONE_GC;
902 if ( !emptyRunQueue(cap) ) return;
904 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
905 /* If we have user-installed signal handlers, then wait
906 * for signals to arrive rather then bombing out with a
909 if ( anyUserHandlers() ) {
911 sched_belch("still deadlocked, waiting for signals..."));
915 if (signals_pending()) {
916 startSignalHandlers(cap);
919 // either we have threads to run, or we were interrupted:
920 ASSERT(!emptyRunQueue(cap) || interrupted);
924 #if !defined(THREADED_RTS)
925 /* Probably a real deadlock. Send the current main thread the
926 * Deadlock exception.
929 switch (task->tso->why_blocked) {
931 case BlockedOnBlackHole:
932 case BlockedOnException:
934 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
937 barf("deadlock: main thread blocked in a strange way");
945 /* ----------------------------------------------------------------------------
946 * Process an event (GRAN only)
947 * ------------------------------------------------------------------------- */
951 scheduleProcessEvent(rtsEvent *event)
955 if (RtsFlags.GranFlags.Light)
956 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
958 /* adjust time based on time-stamp */
959 if (event->time > CurrentTime[CurrentProc] &&
960 event->evttype != ContinueThread)
961 CurrentTime[CurrentProc] = event->time;
963 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
964 if (!RtsFlags.GranFlags.Light)
967 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
969 /* main event dispatcher in GranSim */
970 switch (event->evttype) {
971 /* Should just be continuing execution */
973 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
974 /* ToDo: check assertion
975 ASSERT(run_queue_hd != (StgTSO*)NULL &&
976 run_queue_hd != END_TSO_QUEUE);
978 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
979 if (!RtsFlags.GranFlags.DoAsyncFetch &&
980 procStatus[CurrentProc]==Fetching) {
981 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
982 CurrentTSO->id, CurrentTSO, CurrentProc);
985 /* Ignore ContinueThreads for completed threads */
986 if (CurrentTSO->what_next == ThreadComplete) {
987 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
988 CurrentTSO->id, CurrentTSO, CurrentProc);
991 /* Ignore ContinueThreads for threads that are being migrated */
992 if (PROCS(CurrentTSO)==Nowhere) {
993 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
994 CurrentTSO->id, CurrentTSO, CurrentProc);
997 /* The thread should be at the beginning of the run queue */
998 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
999 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1000 CurrentTSO->id, CurrentTSO, CurrentProc);
1001 break; // run the thread anyway
1004 new_event(proc, proc, CurrentTime[proc],
1006 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1008 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1009 break; // now actually run the thread; DaH Qu'vam yImuHbej
1012 do_the_fetchnode(event);
1013 goto next_thread; /* handle next event in event queue */
1016 do_the_globalblock(event);
1017 goto next_thread; /* handle next event in event queue */
1020 do_the_fetchreply(event);
1021 goto next_thread; /* handle next event in event queue */
1023 case UnblockThread: /* Move from the blocked queue to the tail of */
1024 do_the_unblock(event);
1025 goto next_thread; /* handle next event in event queue */
1027 case ResumeThread: /* Move from the blocked queue to the tail of */
1028 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1029 event->tso->gran.blocktime +=
1030 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1031 do_the_startthread(event);
1032 goto next_thread; /* handle next event in event queue */
1035 do_the_startthread(event);
1036 goto next_thread; /* handle next event in event queue */
1039 do_the_movethread(event);
1040 goto next_thread; /* handle next event in event queue */
1043 do_the_movespark(event);
1044 goto next_thread; /* handle next event in event queue */
1047 do_the_findwork(event);
1048 goto next_thread; /* handle next event in event queue */
1051 barf("Illegal event type %u\n", event->evttype);
1054 /* This point was scheduler_loop in the old RTS */
1056 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1058 TimeOfLastEvent = CurrentTime[CurrentProc];
1059 TimeOfNextEvent = get_time_of_next_event();
1060 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1061 // CurrentTSO = ThreadQueueHd;
1063 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1066 if (RtsFlags.GranFlags.Light)
1067 GranSimLight_leave_system(event, &ActiveTSO);
1069 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1072 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1074 /* in a GranSim setup the TSO stays on the run queue */
1076 /* Take a thread from the run queue. */
1077 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1080 debugBelch("GRAN: About to run current thread, which is\n");
1083 context_switch = 0; // turned on via GranYield, checking events and time slice
1086 DumpGranEvent(GR_SCHEDULE, t));
1088 procStatus[CurrentProc] = Busy;
1092 /* ----------------------------------------------------------------------------
1093 * Send pending messages (PARALLEL_HASKELL only)
1094 * ------------------------------------------------------------------------- */
1096 #if defined(PARALLEL_HASKELL)
1098 scheduleSendPendingMessages(void)
1104 # if defined(PAR) // global Mem.Mgmt., omit for now
1105 if (PendingFetches != END_BF_QUEUE) {
1110 if (RtsFlags.ParFlags.BufferTime) {
1111 // if we use message buffering, we must send away all message
1112 // packets which have become too old...
1118 /* ----------------------------------------------------------------------------
1119 * Activate spark threads (PARALLEL_HASKELL only)
1120 * ------------------------------------------------------------------------- */
1122 #if defined(PARALLEL_HASKELL)
1124 scheduleActivateSpark(void)
1127 ASSERT(emptyRunQueue());
1128 /* We get here if the run queue is empty and want some work.
1129 We try to turn a spark into a thread, and add it to the run queue,
1130 from where it will be picked up in the next iteration of the scheduler
1134 /* :-[ no local threads => look out for local sparks */
1135 /* the spark pool for the current PE */
1136 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1137 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1138 pool->hd < pool->tl) {
1140 * ToDo: add GC code check that we really have enough heap afterwards!!
1142 * If we're here (no runnable threads) and we have pending
1143 * sparks, we must have a space problem. Get enough space
1144 * to turn one of those pending sparks into a
1148 spark = findSpark(rtsFalse); /* get a spark */
1149 if (spark != (rtsSpark) NULL) {
1150 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1151 IF_PAR_DEBUG(fish, // schedule,
1152 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1153 tso->id, tso, advisory_thread_count));
1155 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1156 IF_PAR_DEBUG(fish, // schedule,
1157 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1159 return rtsFalse; /* failed to generate a thread */
1160 } /* otherwise fall through & pick-up new tso */
1162 IF_PAR_DEBUG(fish, // schedule,
1163 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1164 spark_queue_len(pool)));
1165 return rtsFalse; /* failed to generate a thread */
1167 return rtsTrue; /* success in generating a thread */
1168 } else { /* no more threads permitted or pool empty */
1169 return rtsFalse; /* failed to generateThread */
1172 tso = NULL; // avoid compiler warning only
1173 return rtsFalse; /* dummy in non-PAR setup */
1176 #endif // PARALLEL_HASKELL
1178 /* ----------------------------------------------------------------------------
1179 * Get work from a remote node (PARALLEL_HASKELL only)
1180 * ------------------------------------------------------------------------- */
1182 #if defined(PARALLEL_HASKELL)
1184 scheduleGetRemoteWork(rtsBool *receivedFinish)
1186 ASSERT(emptyRunQueue());
1188 if (RtsFlags.ParFlags.BufferTime) {
1189 IF_PAR_DEBUG(verbose,
1190 debugBelch("...send all pending data,"));
1193 for (i=1; i<=nPEs; i++)
1194 sendImmediately(i); // send all messages away immediately
1198 //++EDEN++ idle() , i.e. send all buffers, wait for work
1199 // suppress fishing in EDEN... just look for incoming messages
1200 // (blocking receive)
1201 IF_PAR_DEBUG(verbose,
1202 debugBelch("...wait for incoming messages...\n"));
1203 *receivedFinish = processMessages(); // blocking receive...
1205 // and reenter scheduling loop after having received something
1206 // (return rtsFalse below)
1208 # else /* activate SPARKS machinery */
1209 /* We get here, if we have no work, tried to activate a local spark, but still
1210 have no work. We try to get a remote spark, by sending a FISH message.
1211 Thread migration should be added here, and triggered when a sequence of
1212 fishes returns without work. */
1213 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1215 /* =8-[ no local sparks => look for work on other PEs */
1217 * We really have absolutely no work. Send out a fish
1218 * (there may be some out there already), and wait for
1219 * something to arrive. We clearly can't run any threads
1220 * until a SCHEDULE or RESUME arrives, and so that's what
1221 * we're hoping to see. (Of course, we still have to
1222 * respond to other types of messages.)
1224 rtsTime now = msTime() /*CURRENT_TIME*/;
1225 IF_PAR_DEBUG(verbose,
1226 debugBelch("-- now=%ld\n", now));
1227 IF_PAR_DEBUG(fish, // verbose,
1228 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1229 (last_fish_arrived_at!=0 &&
1230 last_fish_arrived_at+delay > now)) {
1231 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1232 now, last_fish_arrived_at+delay,
1233 last_fish_arrived_at,
1237 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1238 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1239 if (last_fish_arrived_at==0 ||
1240 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1241 /* outstandingFishes is set in sendFish, processFish;
1242 avoid flooding system with fishes via delay */
1243 next_fish_to_send_at = 0;
1245 /* ToDo: this should be done in the main scheduling loop to avoid the
1246 busy wait here; not so bad if fish delay is very small */
1247 int iq = 0; // DEBUGGING -- HWL
1248 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1249 /* send a fish when ready, but process messages that arrive in the meantime */
1251 if (PacketsWaiting()) {
1253 *receivedFinish = processMessages();
1256 } while (!*receivedFinish || now<next_fish_to_send_at);
1257 // JB: This means the fish could become obsolete, if we receive
1258 // work. Better check for work again?
1259 // last line: while (!receivedFinish || !haveWork || now<...)
1260 // next line: if (receivedFinish || haveWork )
1262 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1263 return rtsFalse; // NB: this will leave scheduler loop
1264 // immediately after return!
1266 IF_PAR_DEBUG(fish, // verbose,
1267 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1271 // JB: IMHO, this should all be hidden inside sendFish(...)
1273 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1276 // Global statistics: count no. of fishes
1277 if (RtsFlags.ParFlags.ParStats.Global &&
1278 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1279 globalParStats.tot_fish_mess++;
1283 /* delayed fishes must have been sent by now! */
1284 next_fish_to_send_at = 0;
1287 *receivedFinish = processMessages();
1288 # endif /* SPARKS */
1291 /* NB: this function always returns rtsFalse, meaning the scheduler
1292 loop continues with the next iteration;
1294 return code means success in finding work; we enter this function
1295 if there is no local work, thus have to send a fish which takes
1296 time until it arrives with work; in the meantime we should process
1297 messages in the main loop;
1300 #endif // PARALLEL_HASKELL
1302 /* ----------------------------------------------------------------------------
1303 * PAR/GRAN: Report stats & debugging info(?)
1304 * ------------------------------------------------------------------------- */
1306 #if defined(PAR) || defined(GRAN)
1308 scheduleGranParReport(void)
1310 ASSERT(run_queue_hd != END_TSO_QUEUE);
1312 /* Take a thread from the run queue, if we have work */
1313 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1315 /* If this TSO has got its outport closed in the meantime,
1316 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1317 * It has to be marked as TH_DEAD for this purpose.
1318 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1320 JB: TODO: investigate wether state change field could be nuked
1321 entirely and replaced by the normal tso state (whatnext
1322 field). All we want to do is to kill tsos from outside.
1325 /* ToDo: write something to the log-file
1326 if (RTSflags.ParFlags.granSimStats && !sameThread)
1327 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1331 /* the spark pool for the current PE */
1332 pool = &(cap.r.rSparks); // cap = (old) MainCap
1335 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1336 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1339 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1340 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1342 if (RtsFlags.ParFlags.ParStats.Full &&
1343 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1344 (emitSchedule || // forced emit
1345 (t && LastTSO && t->id != LastTSO->id))) {
1347 we are running a different TSO, so write a schedule event to log file
1348 NB: If we use fair scheduling we also have to write a deschedule
1349 event for LastTSO; with unfair scheduling we know that the
1350 previous tso has blocked whenever we switch to another tso, so
1351 we don't need it in GUM for now
1353 IF_PAR_DEBUG(fish, // schedule,
1354 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1356 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1357 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1358 emitSchedule = rtsFalse;
1363 /* ----------------------------------------------------------------------------
1364 * After running a thread...
1365 * ------------------------------------------------------------------------- */
1368 schedulePostRunThread(void)
1371 /* HACK 675: if the last thread didn't yield, make sure to print a
1372 SCHEDULE event to the log file when StgRunning the next thread, even
1373 if it is the same one as before */
1375 TimeOfLastYield = CURRENT_TIME;
1378 /* some statistics gathering in the parallel case */
1380 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1384 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1385 globalGranStats.tot_heapover++;
1387 globalParStats.tot_heapover++;
1394 DumpGranEvent(GR_DESCHEDULE, t));
1395 globalGranStats.tot_stackover++;
1398 // DumpGranEvent(GR_DESCHEDULE, t);
1399 globalParStats.tot_stackover++;
1403 case ThreadYielding:
1406 DumpGranEvent(GR_DESCHEDULE, t));
1407 globalGranStats.tot_yields++;
1410 // DumpGranEvent(GR_DESCHEDULE, t);
1411 globalParStats.tot_yields++;
1418 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1419 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1420 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1421 if (t->block_info.closure!=(StgClosure*)NULL)
1422 print_bq(t->block_info.closure);
1425 // ??? needed; should emit block before
1427 DumpGranEvent(GR_DESCHEDULE, t));
1428 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1431 ASSERT(procStatus[CurrentProc]==Busy ||
1432 ((procStatus[CurrentProc]==Fetching) &&
1433 (t->block_info.closure!=(StgClosure*)NULL)));
1434 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1435 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1436 procStatus[CurrentProc]==Fetching))
1437 procStatus[CurrentProc] = Idle;
1440 //++PAR++ blockThread() writes the event (change?)
1444 case ThreadFinished:
1448 barf("parGlobalStats: unknown return code");
1454 /* -----------------------------------------------------------------------------
1455 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1456 * -------------------------------------------------------------------------- */
1459 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1461 // did the task ask for a large block?
1462 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1463 // if so, get one and push it on the front of the nursery.
1467 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1470 debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1471 (long)t->id, whatNext_strs[t->what_next], blocks));
1473 // don't do this if the nursery is (nearly) full, we'll GC first.
1474 if (cap->r.rCurrentNursery->link != NULL ||
1475 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1476 // if the nursery has only one block.
1479 bd = allocGroup( blocks );
1481 cap->r.rNursery->n_blocks += blocks;
1483 // link the new group into the list
1484 bd->link = cap->r.rCurrentNursery;
1485 bd->u.back = cap->r.rCurrentNursery->u.back;
1486 if (cap->r.rCurrentNursery->u.back != NULL) {
1487 cap->r.rCurrentNursery->u.back->link = bd;
1490 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1491 g0s0 == cap->r.rNursery);
1493 cap->r.rNursery->blocks = bd;
1495 cap->r.rCurrentNursery->u.back = bd;
1497 // initialise it as a nursery block. We initialise the
1498 // step, gen_no, and flags field of *every* sub-block in
1499 // this large block, because this is easier than making
1500 // sure that we always find the block head of a large
1501 // block whenever we call Bdescr() (eg. evacuate() and
1502 // isAlive() in the GC would both have to do this, at
1506 for (x = bd; x < bd + blocks; x++) {
1507 x->step = cap->r.rNursery;
1513 // This assert can be a killer if the app is doing lots
1514 // of large block allocations.
1515 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1517 // now update the nursery to point to the new block
1518 cap->r.rCurrentNursery = bd;
1520 // we might be unlucky and have another thread get on the
1521 // run queue before us and steal the large block, but in that
1522 // case the thread will just end up requesting another large
1524 pushOnRunQueue(cap,t);
1525 return rtsFalse; /* not actually GC'ing */
1530 debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1531 (long)t->id, whatNext_strs[t->what_next]));
1533 ASSERT(!is_on_queue(t,CurrentProc));
1534 #elif defined(PARALLEL_HASKELL)
1535 /* Currently we emit a DESCHEDULE event before GC in GUM.
1536 ToDo: either add separate event to distinguish SYSTEM time from rest
1537 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1538 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1539 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1540 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1541 emitSchedule = rtsTrue;
1545 pushOnRunQueue(cap,t);
1547 /* actual GC is done at the end of the while loop in schedule() */
1550 /* -----------------------------------------------------------------------------
1551 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1552 * -------------------------------------------------------------------------- */
1555 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1557 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1558 (long)t->id, whatNext_strs[t->what_next]));
1559 /* just adjust the stack for this thread, then pop it back
1563 /* enlarge the stack */
1564 StgTSO *new_t = threadStackOverflow(cap, t);
1566 /* The TSO attached to this Task may have moved, so update the
1569 if (task->tso == t) {
1572 pushOnRunQueue(cap,new_t);
1576 /* -----------------------------------------------------------------------------
1577 * Handle a thread that returned to the scheduler with ThreadYielding
1578 * -------------------------------------------------------------------------- */
1581 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1583 // Reset the context switch flag. We don't do this just before
1584 // running the thread, because that would mean we would lose ticks
1585 // during GC, which can lead to unfair scheduling (a thread hogs
1586 // the CPU because the tick always arrives during GC). This way
1587 // penalises threads that do a lot of allocation, but that seems
1588 // better than the alternative.
1591 /* put the thread back on the run queue. Then, if we're ready to
1592 * GC, check whether this is the last task to stop. If so, wake
1593 * up the GC thread. getThread will block during a GC until the
1597 if (t->what_next != prev_what_next) {
1598 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1599 (long)t->id, whatNext_strs[t->what_next]);
1601 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1602 (long)t->id, whatNext_strs[t->what_next]);
1607 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1609 ASSERT(t->link == END_TSO_QUEUE);
1611 // Shortcut if we're just switching evaluators: don't bother
1612 // doing stack squeezing (which can be expensive), just run the
1614 if (t->what_next != prev_what_next) {
1619 ASSERT(!is_on_queue(t,CurrentProc));
1622 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1623 checkThreadQsSanity(rtsTrue));
1627 addToRunQueue(cap,t);
1630 /* add a ContinueThread event to actually process the thread */
1631 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1633 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1635 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1642 /* -----------------------------------------------------------------------------
1643 * Handle a thread that returned to the scheduler with ThreadBlocked
1644 * -------------------------------------------------------------------------- */
1647 scheduleHandleThreadBlocked( StgTSO *t
1648 #if !defined(GRAN) && !defined(DEBUG)
1655 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1656 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1657 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1659 // ??? needed; should emit block before
1661 DumpGranEvent(GR_DESCHEDULE, t));
1662 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1665 ASSERT(procStatus[CurrentProc]==Busy ||
1666 ((procStatus[CurrentProc]==Fetching) &&
1667 (t->block_info.closure!=(StgClosure*)NULL)));
1668 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1669 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1670 procStatus[CurrentProc]==Fetching))
1671 procStatus[CurrentProc] = Idle;
1675 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1676 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1679 if (t->block_info.closure!=(StgClosure*)NULL)
1680 print_bq(t->block_info.closure));
1682 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1685 /* whatever we schedule next, we must log that schedule */
1686 emitSchedule = rtsTrue;
1690 // We don't need to do anything. The thread is blocked, and it
1691 // has tidied up its stack and placed itself on whatever queue
1692 // it needs to be on.
1695 ASSERT(t->why_blocked != NotBlocked);
1696 // This might not be true under SMP: we don't have
1697 // exclusive access to this TSO, so someone might have
1698 // woken it up by now. This actually happens: try
1699 // conc023 +RTS -N2.
1703 debugBelch("--<< thread %d (%s) stopped: ",
1704 t->id, whatNext_strs[t->what_next]);
1705 printThreadBlockage(t);
1708 /* Only for dumping event to log file
1709 ToDo: do I need this in GranSim, too?
1715 /* -----------------------------------------------------------------------------
1716 * Handle a thread that returned to the scheduler with ThreadFinished
1717 * -------------------------------------------------------------------------- */
1720 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1722 /* Need to check whether this was a main thread, and if so,
1723 * return with the return value.
1725 * We also end up here if the thread kills itself with an
1726 * uncaught exception, see Exception.cmm.
1728 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1729 t->id, whatNext_strs[t->what_next]));
1732 endThread(t, CurrentProc); // clean-up the thread
1733 #elif defined(PARALLEL_HASKELL)
1734 /* For now all are advisory -- HWL */
1735 //if(t->priority==AdvisoryPriority) ??
1736 advisory_thread_count--; // JB: Caution with this counter, buggy!
1739 if(t->dist.priority==RevalPriority)
1743 # if defined(EDENOLD)
1744 // the thread could still have an outport... (BUG)
1745 if (t->eden.outport != -1) {
1746 // delete the outport for the tso which has finished...
1747 IF_PAR_DEBUG(eden_ports,
1748 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1749 t->eden.outport, t->id));
1752 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1753 if (t->eden.epid != -1) {
1754 IF_PAR_DEBUG(eden_ports,
1755 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1756 t->id, t->eden.epid));
1757 removeTSOfromProcess(t);
1762 if (RtsFlags.ParFlags.ParStats.Full &&
1763 !RtsFlags.ParFlags.ParStats.Suppressed)
1764 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1766 // t->par only contains statistics: left out for now...
1768 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1769 t->id,t,t->par.sparkname));
1771 #endif // PARALLEL_HASKELL
1774 // Check whether the thread that just completed was a bound
1775 // thread, and if so return with the result.
1777 // There is an assumption here that all thread completion goes
1778 // through this point; we need to make sure that if a thread
1779 // ends up in the ThreadKilled state, that it stays on the run
1780 // queue so it can be dealt with here.
1785 if (t->bound != task) {
1786 #if !defined(THREADED_RTS)
1787 // Must be a bound thread that is not the topmost one. Leave
1788 // it on the run queue until the stack has unwound to the
1789 // point where we can deal with this. Leaving it on the run
1790 // queue also ensures that the garbage collector knows about
1791 // this thread and its return value (it gets dropped from the
1792 // all_threads list so there's no other way to find it).
1793 appendToRunQueue(cap,t);
1796 // this cannot happen in the threaded RTS, because a
1797 // bound thread can only be run by the appropriate Task.
1798 barf("finished bound thread that isn't mine");
1802 ASSERT(task->tso == t);
1804 if (t->what_next == ThreadComplete) {
1806 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1807 *(task->ret) = (StgClosure *)task->tso->sp[1];
1809 task->stat = Success;
1812 *(task->ret) = NULL;
1815 task->stat = Interrupted;
1817 task->stat = Killed;
1821 removeThreadLabel((StgWord)task->tso->id);
1823 return rtsTrue; // tells schedule() to return
1829 /* -----------------------------------------------------------------------------
1830 * Perform a heap census, if PROFILING
1831 * -------------------------------------------------------------------------- */
1834 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1836 #if defined(PROFILING)
1837 // When we have +RTS -i0 and we're heap profiling, do a census at
1838 // every GC. This lets us get repeatable runs for debugging.
1839 if (performHeapProfile ||
1840 (RtsFlags.ProfFlags.profileInterval==0 &&
1841 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1842 GarbageCollect(GetRoots, rtsTrue);
1844 performHeapProfile = rtsFalse;
1845 return rtsTrue; // true <=> we already GC'd
1851 /* -----------------------------------------------------------------------------
1852 * Perform a garbage collection if necessary
1853 * -------------------------------------------------------------------------- */
1856 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1860 static volatile StgWord waiting_for_gc;
1861 rtsBool was_waiting;
1866 // In order to GC, there must be no threads running Haskell code.
1867 // Therefore, the GC thread needs to hold *all* the capabilities,
1868 // and release them after the GC has completed.
1870 // This seems to be the simplest way: previous attempts involved
1871 // making all the threads with capabilities give up their
1872 // capabilities and sleep except for the *last* one, which
1873 // actually did the GC. But it's quite hard to arrange for all
1874 // the other tasks to sleep and stay asleep.
1877 was_waiting = cas(&waiting_for_gc, 0, 1);
1880 IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1881 yieldCapability(&cap,task);
1882 } while (waiting_for_gc);
1886 for (i=0; i < n_capabilities; i++) {
1887 IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1888 if (cap != &capabilities[i]) {
1889 Capability *pcap = &capabilities[i];
1890 // we better hope this task doesn't get migrated to
1891 // another Capability while we're waiting for this one.
1892 // It won't, because load balancing happens while we have
1893 // all the Capabilities, but even so it's a slightly
1894 // unsavoury invariant.
1897 waitForReturnCapability(&pcap, task);
1898 if (pcap != &capabilities[i]) {
1899 barf("scheduleDoGC: got the wrong capability");
1904 waiting_for_gc = rtsFalse;
1907 /* Kick any transactions which are invalid back to their
1908 * atomically frames. When next scheduled they will try to
1909 * commit, this commit will fail and they will retry.
1914 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1915 if (t->what_next == ThreadRelocated) {
1918 next = t->global_link;
1919 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1920 if (!stmValidateNestOfTransactions (t -> trec)) {
1921 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1923 // strip the stack back to the
1924 // ATOMICALLY_FRAME, aborting the (nested)
1925 // transaction, and saving the stack of any
1926 // partially-evaluated thunks on the heap.
1927 raiseAsync_(cap, t, NULL, rtsTrue);
1930 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1938 // so this happens periodically:
1939 scheduleCheckBlackHoles(cap);
1941 IF_DEBUG(scheduler, printAllThreads());
1943 /* everybody back, start the GC.
1944 * Could do it in this thread, or signal a condition var
1945 * to do it in another thread. Either way, we need to
1946 * broadcast on gc_pending_cond afterward.
1948 #if defined(THREADED_RTS)
1949 IF_DEBUG(scheduler,sched_belch("doing GC"));
1951 GarbageCollect(GetRoots, force_major);
1954 // release our stash of capabilities.
1955 for (i = 0; i < n_capabilities; i++) {
1956 if (cap != &capabilities[i]) {
1957 task->cap = &capabilities[i];
1958 releaseCapability(&capabilities[i]);
1965 /* add a ContinueThread event to continue execution of current thread */
1966 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1968 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1970 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1976 /* ---------------------------------------------------------------------------
1977 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1978 * used by Control.Concurrent for error checking.
1979 * ------------------------------------------------------------------------- */
1982 rtsSupportsBoundThreads(void)
1984 #if defined(THREADED_RTS)
1991 /* ---------------------------------------------------------------------------
1992 * isThreadBound(tso): check whether tso is bound to an OS thread.
1993 * ------------------------------------------------------------------------- */
1996 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
1998 #if defined(THREADED_RTS)
1999 return (tso->bound != NULL);
2004 /* ---------------------------------------------------------------------------
2005 * Singleton fork(). Do not copy any running threads.
2006 * ------------------------------------------------------------------------- */
2008 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2009 #define FORKPROCESS_PRIMOP_SUPPORTED
2012 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2014 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2017 forkProcess(HsStablePtr *entry
2018 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2023 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2029 IF_DEBUG(scheduler,sched_belch("forking!"));
2031 // ToDo: for SMP, we should probably acquire *all* the capabilities
2036 if (pid) { // parent
2038 // just return the pid
2044 // delete all threads
2045 cap->run_queue_hd = END_TSO_QUEUE;
2046 cap->run_queue_tl = END_TSO_QUEUE;
2048 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2051 // don't allow threads to catch the ThreadKilled exception
2052 deleteThreadImmediately(cap,t);
2055 // wipe the task list
2056 ACQUIRE_LOCK(&sched_mutex);
2057 for (task = all_tasks; task != NULL; task=task->all_link) {
2058 if (task != cap->running_task) discardTask(task);
2060 RELEASE_LOCK(&sched_mutex);
2062 #if defined(THREADED_RTS)
2063 // wipe our spare workers list.
2064 cap->spare_workers = NULL;
2067 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2068 rts_checkSchedStatus("forkProcess",cap);
2071 hs_exit(); // clean up and exit
2072 stg_exit(EXIT_SUCCESS);
2074 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2075 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2080 /* ---------------------------------------------------------------------------
2081 * Delete the threads on the run queue of the current capability.
2082 * ------------------------------------------------------------------------- */
2085 deleteRunQueue (Capability *cap)
2088 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2089 ASSERT(t->what_next != ThreadRelocated);
2091 deleteThread(cap, t);
2095 /* startThread and insertThread are now in GranSim.c -- HWL */
2098 /* -----------------------------------------------------------------------------
2099 Managing the suspended_ccalling_tasks list.
2100 Locks required: sched_mutex
2101 -------------------------------------------------------------------------- */
2104 suspendTask (Capability *cap, Task *task)
2106 ASSERT(task->next == NULL && task->prev == NULL);
2107 task->next = cap->suspended_ccalling_tasks;
2109 if (cap->suspended_ccalling_tasks) {
2110 cap->suspended_ccalling_tasks->prev = task;
2112 cap->suspended_ccalling_tasks = task;
2116 recoverSuspendedTask (Capability *cap, Task *task)
2119 task->prev->next = task->next;
2121 ASSERT(cap->suspended_ccalling_tasks == task);
2122 cap->suspended_ccalling_tasks = task->next;
2125 task->next->prev = task->prev;
2127 task->next = task->prev = NULL;
2130 /* ---------------------------------------------------------------------------
2131 * Suspending & resuming Haskell threads.
2133 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2134 * its capability before calling the C function. This allows another
2135 * task to pick up the capability and carry on running Haskell
2136 * threads. It also means that if the C call blocks, it won't lock
2139 * The Haskell thread making the C call is put to sleep for the
2140 * duration of the call, on the susepended_ccalling_threads queue. We
2141 * give out a token to the task, which it can use to resume the thread
2142 * on return from the C function.
2143 * ------------------------------------------------------------------------- */
2146 suspendThread (StgRegTable *reg)
2149 int saved_errno = errno;
2153 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2155 cap = regTableToCapability(reg);
2157 task = cap->running_task;
2158 tso = cap->r.rCurrentTSO;
2161 sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2163 // XXX this might not be necessary --SDM
2164 tso->what_next = ThreadRunGHC;
2168 if(tso->blocked_exceptions == NULL) {
2169 tso->why_blocked = BlockedOnCCall;
2170 tso->blocked_exceptions = END_TSO_QUEUE;
2172 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2175 // Hand back capability
2176 task->suspended_tso = tso;
2178 ACQUIRE_LOCK(&cap->lock);
2180 suspendTask(cap,task);
2181 cap->in_haskell = rtsFalse;
2182 releaseCapability_(cap);
2184 RELEASE_LOCK(&cap->lock);
2186 #if defined(THREADED_RTS)
2187 /* Preparing to leave the RTS, so ensure there's a native thread/task
2188 waiting to take over.
2190 IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2193 errno = saved_errno;
2198 resumeThread (void *task_)
2202 int saved_errno = errno;
2206 // Wait for permission to re-enter the RTS with the result.
2207 waitForReturnCapability(&cap,task);
2208 // we might be on a different capability now... but if so, our
2209 // entry on the suspended_ccalling_tasks list will also have been
2212 // Remove the thread from the suspended list
2213 recoverSuspendedTask(cap,task);
2215 tso = task->suspended_tso;
2216 task->suspended_tso = NULL;
2217 tso->link = END_TSO_QUEUE;
2218 IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2220 if (tso->why_blocked == BlockedOnCCall) {
2221 awakenBlockedQueue(cap,tso->blocked_exceptions);
2222 tso->blocked_exceptions = NULL;
2225 /* Reset blocking status */
2226 tso->why_blocked = NotBlocked;
2228 cap->r.rCurrentTSO = tso;
2229 cap->in_haskell = rtsTrue;
2230 errno = saved_errno;
2235 /* ---------------------------------------------------------------------------
2236 * Comparing Thread ids.
2238 * This is used from STG land in the implementation of the
2239 * instances of Eq/Ord for ThreadIds.
2240 * ------------------------------------------------------------------------ */
2243 cmp_thread(StgPtr tso1, StgPtr tso2)
2245 StgThreadID id1 = ((StgTSO *)tso1)->id;
2246 StgThreadID id2 = ((StgTSO *)tso2)->id;
2248 if (id1 < id2) return (-1);
2249 if (id1 > id2) return 1;
2253 /* ---------------------------------------------------------------------------
2254 * Fetching the ThreadID from an StgTSO.
2256 * This is used in the implementation of Show for ThreadIds.
2257 * ------------------------------------------------------------------------ */
2259 rts_getThreadId(StgPtr tso)
2261 return ((StgTSO *)tso)->id;
2266 labelThread(StgPtr tso, char *label)
2271 /* Caveat: Once set, you can only set the thread name to "" */
2272 len = strlen(label)+1;
2273 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2274 strncpy(buf,label,len);
2275 /* Update will free the old memory for us */
2276 updateThreadLabel(((StgTSO *)tso)->id,buf);
2280 /* ---------------------------------------------------------------------------
2281 Create a new thread.
2283 The new thread starts with the given stack size. Before the
2284 scheduler can run, however, this thread needs to have a closure
2285 (and possibly some arguments) pushed on its stack. See
2286 pushClosure() in Schedule.h.
2288 createGenThread() and createIOThread() (in SchedAPI.h) are
2289 convenient packaged versions of this function.
2291 currently pri (priority) is only used in a GRAN setup -- HWL
2292 ------------------------------------------------------------------------ */
2294 /* currently pri (priority) is only used in a GRAN setup -- HWL */
2296 createThread(nat size, StgInt pri)
2299 createThread(Capability *cap, nat size)
2305 /* sched_mutex is *not* required */
2307 /* First check whether we should create a thread at all */
2308 #if defined(PARALLEL_HASKELL)
2309 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2310 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2312 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2313 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2314 return END_TSO_QUEUE;
2320 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2323 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2325 /* catch ridiculously small stack sizes */
2326 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2327 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2330 stack_size = size - TSO_STRUCT_SIZEW;
2332 tso = (StgTSO *)allocateLocal(cap, size);
2333 TICK_ALLOC_TSO(stack_size, 0);
2335 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2337 SET_GRAN_HDR(tso, ThisPE);
2340 // Always start with the compiled code evaluator
2341 tso->what_next = ThreadRunGHC;
2343 tso->why_blocked = NotBlocked;
2344 tso->blocked_exceptions = NULL;
2346 tso->saved_errno = 0;
2349 tso->stack_size = stack_size;
2350 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
2352 tso->sp = (P_)&(tso->stack) + stack_size;
2354 tso->trec = NO_TREC;
2357 tso->prof.CCCS = CCS_MAIN;
2360 /* put a stop frame on the stack */
2361 tso->sp -= sizeofW(StgStopFrame);
2362 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2363 tso->link = END_TSO_QUEUE;
2367 /* uses more flexible routine in GranSim */
2368 insertThread(tso, CurrentProc);
2370 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2376 if (RtsFlags.GranFlags.GranSimStats.Full)
2377 DumpGranEvent(GR_START,tso);
2378 #elif defined(PARALLEL_HASKELL)
2379 if (RtsFlags.ParFlags.ParStats.Full)
2380 DumpGranEvent(GR_STARTQ,tso);
2381 /* HACk to avoid SCHEDULE
2385 /* Link the new thread on the global thread list.
2387 ACQUIRE_LOCK(&sched_mutex);
2388 tso->id = next_thread_id++; // while we have the mutex
2389 tso->global_link = all_threads;
2391 RELEASE_LOCK(&sched_mutex);
2394 tso->dist.priority = MandatoryPriority; //by default that is...
2398 tso->gran.pri = pri;
2400 tso->gran.magic = TSO_MAGIC; // debugging only
2402 tso->gran.sparkname = 0;
2403 tso->gran.startedat = CURRENT_TIME;
2404 tso->gran.exported = 0;
2405 tso->gran.basicblocks = 0;
2406 tso->gran.allocs = 0;
2407 tso->gran.exectime = 0;
2408 tso->gran.fetchtime = 0;
2409 tso->gran.fetchcount = 0;
2410 tso->gran.blocktime = 0;
2411 tso->gran.blockcount = 0;
2412 tso->gran.blockedat = 0;
2413 tso->gran.globalsparks = 0;
2414 tso->gran.localsparks = 0;
2415 if (RtsFlags.GranFlags.Light)
2416 tso->gran.clock = Now; /* local clock */
2418 tso->gran.clock = 0;
2420 IF_DEBUG(gran,printTSO(tso));
2421 #elif defined(PARALLEL_HASKELL)
2423 tso->par.magic = TSO_MAGIC; // debugging only
2425 tso->par.sparkname = 0;
2426 tso->par.startedat = CURRENT_TIME;
2427 tso->par.exported = 0;
2428 tso->par.basicblocks = 0;
2429 tso->par.allocs = 0;
2430 tso->par.exectime = 0;
2431 tso->par.fetchtime = 0;
2432 tso->par.fetchcount = 0;
2433 tso->par.blocktime = 0;
2434 tso->par.blockcount = 0;
2435 tso->par.blockedat = 0;
2436 tso->par.globalsparks = 0;
2437 tso->par.localsparks = 0;
2441 globalGranStats.tot_threads_created++;
2442 globalGranStats.threads_created_on_PE[CurrentProc]++;
2443 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2444 globalGranStats.tot_sq_probes++;
2445 #elif defined(PARALLEL_HASKELL)
2446 // collect parallel global statistics (currently done together with GC stats)
2447 if (RtsFlags.ParFlags.ParStats.Global &&
2448 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2449 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
2450 globalParStats.tot_threads_created++;
2456 sched_belch("==__ schedule: Created TSO %d (%p);",
2457 CurrentProc, tso, tso->id));
2458 #elif defined(PARALLEL_HASKELL)
2459 IF_PAR_DEBUG(verbose,
2460 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2461 (long)tso->id, tso, advisory_thread_count));
2463 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
2464 (long)tso->id, (long)tso->stack_size));
2471 all parallel thread creation calls should fall through the following routine.
2474 createThreadFromSpark(rtsSpark spark)
2476 ASSERT(spark != (rtsSpark)NULL);
2477 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2478 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
2480 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2481 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2482 return END_TSO_QUEUE;
2486 tso = createThread(RtsFlags.GcFlags.initialStkSize);
2487 if (tso==END_TSO_QUEUE)
2488 barf("createSparkThread: Cannot create TSO");
2490 tso->priority = AdvisoryPriority;
2492 pushClosure(tso,spark);
2494 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
2501 Turn a spark into a thread.
2502 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2506 activateSpark (rtsSpark spark)
2510 tso = createSparkThread(spark);
2511 if (RtsFlags.ParFlags.ParStats.Full) {
2512 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2513 IF_PAR_DEBUG(verbose,
2514 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2515 (StgClosure *)spark, info_type((StgClosure *)spark)));
2517 // ToDo: fwd info on local/global spark to thread -- HWL
2518 // tso->gran.exported = spark->exported;
2519 // tso->gran.locked = !spark->global;
2520 // tso->gran.sparkname = spark->name;
2526 /* ---------------------------------------------------------------------------
2529 * scheduleThread puts a thread on the end of the runnable queue.
2530 * This will usually be done immediately after a thread is created.
2531 * The caller of scheduleThread must create the thread using e.g.
2532 * createThread and push an appropriate closure
2533 * on this thread's stack before the scheduler is invoked.
2534 * ------------------------------------------------------------------------ */
2537 scheduleThread(Capability *cap, StgTSO *tso)
2539 // The thread goes at the *end* of the run-queue, to avoid possible
2540 // starvation of any threads already on the queue.
2541 appendToRunQueue(cap,tso);
2545 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2549 // We already created/initialised the Task
2550 task = cap->running_task;
2552 // This TSO is now a bound thread; make the Task and TSO
2553 // point to each other.
2558 task->stat = NoStatus;
2560 appendToRunQueue(cap,tso);
2562 IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2565 /* GranSim specific init */
2566 CurrentTSO = m->tso; // the TSO to run
2567 procStatus[MainProc] = Busy; // status of main PE
2568 CurrentProc = MainProc; // PE to run it on
2571 cap = schedule(cap,task);
2573 ASSERT(task->stat != NoStatus);
2574 ASSERT_CAPABILITY_INVARIANTS(cap,task);
2576 IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2580 /* ----------------------------------------------------------------------------
2582 * ------------------------------------------------------------------------- */
2584 #if defined(THREADED_RTS)
2586 workerStart(Task *task)
2590 // See startWorkerTask().
2591 ACQUIRE_LOCK(&task->lock);
2593 RELEASE_LOCK(&task->lock);
2595 // set the thread-local pointer to the Task:
2598 // schedule() runs without a lock.
2599 cap = schedule(cap,task);
2601 // On exit from schedule(), we have a Capability.
2602 releaseCapability(cap);
2607 /* ---------------------------------------------------------------------------
2610 * Initialise the scheduler. This resets all the queues - if the
2611 * queues contained any threads, they'll be garbage collected at the
2614 * ------------------------------------------------------------------------ */
2621 for (i=0; i<=MAX_PROC; i++) {
2622 run_queue_hds[i] = END_TSO_QUEUE;
2623 run_queue_tls[i] = END_TSO_QUEUE;
2624 blocked_queue_hds[i] = END_TSO_QUEUE;
2625 blocked_queue_tls[i] = END_TSO_QUEUE;
2626 ccalling_threadss[i] = END_TSO_QUEUE;
2627 blackhole_queue[i] = END_TSO_QUEUE;
2628 sleeping_queue = END_TSO_QUEUE;
2630 #elif !defined(THREADED_RTS)
2631 blocked_queue_hd = END_TSO_QUEUE;
2632 blocked_queue_tl = END_TSO_QUEUE;
2633 sleeping_queue = END_TSO_QUEUE;
2636 blackhole_queue = END_TSO_QUEUE;
2637 all_threads = END_TSO_QUEUE;
2642 RtsFlags.ConcFlags.ctxtSwitchTicks =
2643 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2645 #if defined(THREADED_RTS)
2646 /* Initialise the mutex and condition variables used by
2648 initMutex(&sched_mutex);
2651 ACQUIRE_LOCK(&sched_mutex);
2653 /* A capability holds the state a native thread needs in
2654 * order to execute STG code. At least one capability is
2655 * floating around (only SMP builds have more than one).
2663 * Eagerly start one worker to run each Capability, except for
2664 * Capability 0. The idea is that we're probably going to start a
2665 * bound thread on Capability 0 pretty soon, so we don't want a
2666 * worker task hogging it.
2671 for (i = 1; i < n_capabilities; i++) {
2672 cap = &capabilities[i];
2673 ACQUIRE_LOCK(&cap->lock);
2674 startWorkerTask(cap, workerStart);
2675 RELEASE_LOCK(&cap->lock);
2680 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2684 RELEASE_LOCK(&sched_mutex);
2688 exitScheduler( void )
2690 interrupted = rtsTrue;
2691 shutting_down_scheduler = rtsTrue;
2693 #if defined(THREADED_RTS)
2698 ACQUIRE_LOCK(&sched_mutex);
2699 task = newBoundTask();
2700 RELEASE_LOCK(&sched_mutex);
2702 for (i = 0; i < n_capabilities; i++) {
2703 shutdownCapability(&capabilities[i], task);
2705 boundTaskExiting(task);
2711 /* ---------------------------------------------------------------------------
2712 Where are the roots that we know about?
2714 - all the threads on the runnable queue
2715 - all the threads on the blocked queue
2716 - all the threads on the sleeping queue
2717 - all the thread currently executing a _ccall_GC
2718 - all the "main threads"
2720 ------------------------------------------------------------------------ */
2722 /* This has to be protected either by the scheduler monitor, or by the
2723 garbage collection monitor (probably the latter).
2728 GetRoots( evac_fn evac )
2735 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2736 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2737 evac((StgClosure **)&run_queue_hds[i]);
2738 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2739 evac((StgClosure **)&run_queue_tls[i]);
2741 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2742 evac((StgClosure **)&blocked_queue_hds[i]);
2743 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2744 evac((StgClosure **)&blocked_queue_tls[i]);
2745 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2746 evac((StgClosure **)&ccalling_threads[i]);
2753 for (i = 0; i < n_capabilities; i++) {
2754 cap = &capabilities[i];
2755 evac((StgClosure **)&cap->run_queue_hd);
2756 evac((StgClosure **)&cap->run_queue_tl);
2758 for (task = cap->suspended_ccalling_tasks; task != NULL;
2760 evac((StgClosure **)&task->suspended_tso);
2764 #if !defined(THREADED_RTS)
2765 evac((StgClosure **)&blocked_queue_hd);
2766 evac((StgClosure **)&blocked_queue_tl);
2767 evac((StgClosure **)&sleeping_queue);
2771 evac((StgClosure **)&blackhole_queue);
2773 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2774 markSparkQueue(evac);
2777 #if defined(RTS_USER_SIGNALS)
2778 // mark the signal handlers (signals should be already blocked)
2779 markSignalHandlers(evac);
2783 /* -----------------------------------------------------------------------------
2786 This is the interface to the garbage collector from Haskell land.
2787 We provide this so that external C code can allocate and garbage
2788 collect when called from Haskell via _ccall_GC.
2790 It might be useful to provide an interface whereby the programmer
2791 can specify more roots (ToDo).
2793 This needs to be protected by the GC condition variable above. KH.
2794 -------------------------------------------------------------------------- */
2796 static void (*extra_roots)(evac_fn);
2802 // ToDo: we have to grab all the capabilities here.
2803 errorBelch("performGC not supported in threaded RTS (yet)");
2804 stg_exit(EXIT_FAILURE);
2806 /* Obligated to hold this lock upon entry */
2807 GarbageCollect(GetRoots,rtsFalse);
2811 performMajorGC(void)
2814 errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2815 stg_exit(EXIT_FAILURE);
2817 GarbageCollect(GetRoots,rtsTrue);
2821 AllRoots(evac_fn evac)
2823 GetRoots(evac); // the scheduler's roots
2824 extra_roots(evac); // the user's roots
2828 performGCWithRoots(void (*get_roots)(evac_fn))
2831 errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2832 stg_exit(EXIT_FAILURE);
2834 extra_roots = get_roots;
2835 GarbageCollect(AllRoots,rtsFalse);
2838 /* -----------------------------------------------------------------------------
2841 If the thread has reached its maximum stack size, then raise the
2842 StackOverflow exception in the offending thread. Otherwise
2843 relocate the TSO into a larger chunk of memory and adjust its stack
2845 -------------------------------------------------------------------------- */
2848 threadStackOverflow(Capability *cap, StgTSO *tso)
2850 nat new_stack_size, stack_words;
2855 IF_DEBUG(sanity,checkTSO(tso));
2856 if (tso->stack_size >= tso->max_stack_size) {
2859 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2860 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2861 /* If we're debugging, just print out the top of the stack */
2862 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2865 /* Send this thread the StackOverflow exception */
2866 raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2870 /* Try to double the current stack size. If that takes us over the
2871 * maximum stack size for this thread, then use the maximum instead.
2872 * Finally round up so the TSO ends up as a whole number of blocks.
2874 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2875 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2876 TSO_STRUCT_SIZE)/sizeof(W_);
2877 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2878 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2880 IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2882 dest = (StgTSO *)allocate(new_tso_size);
2883 TICK_ALLOC_TSO(new_stack_size,0);
2885 /* copy the TSO block and the old stack into the new area */
2886 memcpy(dest,tso,TSO_STRUCT_SIZE);
2887 stack_words = tso->stack + tso->stack_size - tso->sp;
2888 new_sp = (P_)dest + new_tso_size - stack_words;
2889 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2891 /* relocate the stack pointers... */
2893 dest->stack_size = new_stack_size;
2895 /* Mark the old TSO as relocated. We have to check for relocated
2896 * TSOs in the garbage collector and any primops that deal with TSOs.
2898 * It's important to set the sp value to just beyond the end
2899 * of the stack, so we don't attempt to scavenge any part of the
2902 tso->what_next = ThreadRelocated;
2904 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2905 tso->why_blocked = NotBlocked;
2907 IF_PAR_DEBUG(verbose,
2908 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2909 tso->id, tso, tso->stack_size);
2910 /* If we're debugging, just print out the top of the stack */
2911 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2914 IF_DEBUG(sanity,checkTSO(tso));
2916 IF_DEBUG(scheduler,printTSO(dest));
2922 /* ---------------------------------------------------------------------------
2923 Wake up a queue that was blocked on some resource.
2924 ------------------------------------------------------------------------ */
2928 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2931 #elif defined(PARALLEL_HASKELL)
2933 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2935 /* write RESUME events to log file and
2936 update blocked and fetch time (depending on type of the orig closure) */
2937 if (RtsFlags.ParFlags.ParStats.Full) {
2938 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2939 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2940 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2941 if (emptyRunQueue())
2942 emitSchedule = rtsTrue;
2944 switch (get_itbl(node)->type) {
2946 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2951 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2958 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2965 StgBlockingQueueElement *
2966 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2969 PEs node_loc, tso_loc;
2971 node_loc = where_is(node); // should be lifted out of loop
2972 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2973 tso_loc = where_is((StgClosure *)tso);
2974 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2975 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2976 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2977 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2978 // insertThread(tso, node_loc);
2979 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2981 tso, node, (rtsSpark*)NULL);
2982 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2985 } else { // TSO is remote (actually should be FMBQ)
2986 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2987 RtsFlags.GranFlags.Costs.gunblocktime +
2988 RtsFlags.GranFlags.Costs.latency;
2989 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2991 tso, node, (rtsSpark*)NULL);
2992 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2995 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2997 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2998 (node_loc==tso_loc ? "Local" : "Global"),
2999 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3000 tso->block_info.closure = NULL;
3001 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
3004 #elif defined(PARALLEL_HASKELL)
3005 StgBlockingQueueElement *
3006 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3008 StgBlockingQueueElement *next;
3010 switch (get_itbl(bqe)->type) {
3012 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3013 /* if it's a TSO just push it onto the run_queue */
3015 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3016 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
3018 unblockCount(bqe, node);
3019 /* reset blocking status after dumping event */
3020 ((StgTSO *)bqe)->why_blocked = NotBlocked;
3024 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3026 bqe->link = (StgBlockingQueueElement *)PendingFetches;
3027 PendingFetches = (StgBlockedFetch *)bqe;
3031 /* can ignore this case in a non-debugging setup;
3032 see comments on RBHSave closures above */
3034 /* check that the closure is an RBHSave closure */
3035 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3036 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3037 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3041 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3042 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
3046 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3052 unblockOne(Capability *cap, StgTSO *tso)
3056 ASSERT(get_itbl(tso)->type == TSO);
3057 ASSERT(tso->why_blocked != NotBlocked);
3058 tso->why_blocked = NotBlocked;
3060 tso->link = END_TSO_QUEUE;
3062 // We might have just migrated this TSO to our Capability:
3064 tso->bound->cap = cap;
3067 appendToRunQueue(cap,tso);
3069 // we're holding a newly woken thread, make sure we context switch
3070 // quickly so we can migrate it if necessary.
3072 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3079 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3081 StgBlockingQueueElement *bqe;
3086 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3087 node, CurrentProc, CurrentTime[CurrentProc],
3088 CurrentTSO->id, CurrentTSO));
3090 node_loc = where_is(node);
3092 ASSERT(q == END_BQ_QUEUE ||
3093 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
3094 get_itbl(q)->type == CONSTR); // closure (type constructor)
3095 ASSERT(is_unique(node));
3097 /* FAKE FETCH: magically copy the node to the tso's proc;
3098 no Fetch necessary because in reality the node should not have been
3099 moved to the other PE in the first place
3101 if (CurrentProc!=node_loc) {
3103 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3104 node, node_loc, CurrentProc, CurrentTSO->id,
3105 // CurrentTSO, where_is(CurrentTSO),
3106 node->header.gran.procs));
3107 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3109 debugBelch("## new bitmask of node %p is %#x\n",
3110 node, node->header.gran.procs));
3111 if (RtsFlags.GranFlags.GranSimStats.Global) {
3112 globalGranStats.tot_fake_fetches++;
3117 // ToDo: check: ASSERT(CurrentProc==node_loc);
3118 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3121 bqe points to the current element in the queue
3122 next points to the next element in the queue
3124 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3125 //tso_loc = where_is(tso);
3127 bqe = unblockOne(bqe, node);
3130 /* if this is the BQ of an RBH, we have to put back the info ripped out of
3131 the closure to make room for the anchor of the BQ */
3132 if (bqe!=END_BQ_QUEUE) {
3133 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3135 ASSERT((info_ptr==&RBH_Save_0_info) ||
3136 (info_ptr==&RBH_Save_1_info) ||
3137 (info_ptr==&RBH_Save_2_info));
3139 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3140 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3141 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3144 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3145 node, info_type(node)));
3148 /* statistics gathering */
3149 if (RtsFlags.GranFlags.GranSimStats.Global) {
3150 // globalGranStats.tot_bq_processing_time += bq_processing_time;
3151 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
3152 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
3153 globalGranStats.tot_awbq++; // total no. of bqs awakened
3156 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3157 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3159 #elif defined(PARALLEL_HASKELL)
3161 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3163 StgBlockingQueueElement *bqe;
3165 IF_PAR_DEBUG(verbose,
3166 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3170 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3171 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3176 ASSERT(q == END_BQ_QUEUE ||
3177 get_itbl(q)->type == TSO ||
3178 get_itbl(q)->type == BLOCKED_FETCH ||
3179 get_itbl(q)->type == CONSTR);
3182 while (get_itbl(bqe)->type==TSO ||
3183 get_itbl(bqe)->type==BLOCKED_FETCH) {
3184 bqe = unblockOne(bqe, node);
3188 #else /* !GRAN && !PARALLEL_HASKELL */
3191 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3193 if (tso == NULL) return; // hack; see bug #1235728, and comments in
3195 while (tso != END_TSO_QUEUE) {
3196 tso = unblockOne(cap,tso);
3201 /* ---------------------------------------------------------------------------
3203 - usually called inside a signal handler so it mustn't do anything fancy.
3204 ------------------------------------------------------------------------ */
3207 interruptStgRts(void)
3211 #if defined(THREADED_RTS)
3212 prodAllCapabilities();
3216 /* -----------------------------------------------------------------------------
3219 This is for use when we raise an exception in another thread, which
3221 This has nothing to do with the UnblockThread event in GranSim. -- HWL
3222 -------------------------------------------------------------------------- */
3224 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3226 NB: only the type of the blocking queue is different in GranSim and GUM
3227 the operations on the queue-elements are the same
3228 long live polymorphism!
3230 Locks: sched_mutex is held upon entry and exit.
3234 unblockThread(Capability *cap, StgTSO *tso)
3236 StgBlockingQueueElement *t, **last;
3238 switch (tso->why_blocked) {
3241 return; /* not blocked */
3244 // Be careful: nothing to do here! We tell the scheduler that the thread
3245 // is runnable and we leave it to the stack-walking code to abort the
3246 // transaction while unwinding the stack. We should perhaps have a debugging
3247 // test to make sure that this really happens and that the 'zombie' transaction
3248 // does not get committed.
3252 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3254 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3255 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3257 last = (StgBlockingQueueElement **)&mvar->head;
3258 for (t = (StgBlockingQueueElement *)mvar->head;
3260 last = &t->link, last_tso = t, t = t->link) {
3261 if (t == (StgBlockingQueueElement *)tso) {
3262 *last = (StgBlockingQueueElement *)tso->link;
3263 if (mvar->tail == tso) {
3264 mvar->tail = (StgTSO *)last_tso;
3269 barf("unblockThread (MVAR): TSO not found");
3272 case BlockedOnBlackHole:
3273 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3275 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3277 last = &bq->blocking_queue;
3278 for (t = bq->blocking_queue;
3280 last = &t->link, t = t->link) {
3281 if (t == (StgBlockingQueueElement *)tso) {
3282 *last = (StgBlockingQueueElement *)tso->link;
3286 barf("unblockThread (BLACKHOLE): TSO not found");
3289 case BlockedOnException:
3291 StgTSO *target = tso->block_info.tso;
3293 ASSERT(get_itbl(target)->type == TSO);
3295 if (target->what_next == ThreadRelocated) {
3296 target = target->link;
3297 ASSERT(get_itbl(target)->type == TSO);
3300 ASSERT(target->blocked_exceptions != NULL);
3302 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3303 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
3305 last = &t->link, t = t->link) {
3306 ASSERT(get_itbl(t)->type == TSO);
3307 if (t == (StgBlockingQueueElement *)tso) {
3308 *last = (StgBlockingQueueElement *)tso->link;
3312 barf("unblockThread (Exception): TSO not found");
3316 case BlockedOnWrite:
3317 #if defined(mingw32_HOST_OS)
3318 case BlockedOnDoProc:
3321 /* take TSO off blocked_queue */
3322 StgBlockingQueueElement *prev = NULL;
3323 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
3324 prev = t, t = t->link) {
3325 if (t == (StgBlockingQueueElement *)tso) {
3327 blocked_queue_hd = (StgTSO *)t->link;
3328 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3329 blocked_queue_tl = END_TSO_QUEUE;
3332 prev->link = t->link;
3333 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3334 blocked_queue_tl = (StgTSO *)prev;
3337 #if defined(mingw32_HOST_OS)
3338 /* (Cooperatively) signal that the worker thread should abort
3341 abandonWorkRequest(tso->block_info.async_result->reqID);
3346 barf("unblockThread (I/O): TSO not found");
3349 case BlockedOnDelay:
3351 /* take TSO off sleeping_queue */
3352 StgBlockingQueueElement *prev = NULL;
3353 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
3354 prev = t, t = t->link) {
3355 if (t == (StgBlockingQueueElement *)tso) {
3357 sleeping_queue = (StgTSO *)t->link;
3359 prev->link = t->link;
3364 barf("unblockThread (delay): TSO not found");
3368 barf("unblockThread");
3372 tso->link = END_TSO_QUEUE;
3373 tso->why_blocked = NotBlocked;
3374 tso->block_info.closure = NULL;
3375 pushOnRunQueue(cap,tso);
3379 unblockThread(Capability *cap, StgTSO *tso)
3383 /* To avoid locking unnecessarily. */
3384 if (tso->why_blocked == NotBlocked) {
3388 switch (tso->why_blocked) {
3391 // Be careful: nothing to do here! We tell the scheduler that the thread
3392 // is runnable and we leave it to the stack-walking code to abort the
3393 // transaction while unwinding the stack. We should perhaps have a debugging
3394 // test to make sure that this really happens and that the 'zombie' transaction
3395 // does not get committed.
3399 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3401 StgTSO *last_tso = END_TSO_QUEUE;
3402 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3405 for (t = mvar->head; t != END_TSO_QUEUE;
3406 last = &t->link, last_tso = t, t = t->link) {
3409 if (mvar->tail == tso) {
3410 mvar->tail = last_tso;
3415 barf("unblockThread (MVAR): TSO not found");
3418 case BlockedOnBlackHole:
3420 last = &blackhole_queue;
3421 for (t = blackhole_queue; t != END_TSO_QUEUE;
3422 last = &t->link, t = t->link) {
3428 barf("unblockThread (BLACKHOLE): TSO not found");
3431 case BlockedOnException:
3433 StgTSO *target = tso->block_info.tso;
3435 ASSERT(get_itbl(target)->type == TSO);
3437 while (target->what_next == ThreadRelocated) {
3438 target = target->link;
3439 ASSERT(get_itbl(target)->type == TSO);
3442 ASSERT(target->blocked_exceptions != NULL);
3444 last = &target->blocked_exceptions;
3445 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
3446 last = &t->link, t = t->link) {
3447 ASSERT(get_itbl(t)->type == TSO);
3453 barf("unblockThread (Exception): TSO not found");
3456 #if !defined(THREADED_RTS)
3458 case BlockedOnWrite:
3459 #if defined(mingw32_HOST_OS)
3460 case BlockedOnDoProc:
3463 StgTSO *prev = NULL;
3464 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
3465 prev = t, t = t->link) {
3468 blocked_queue_hd = t->link;
3469 if (blocked_queue_tl == t) {
3470 blocked_queue_tl = END_TSO_QUEUE;
3473 prev->link = t->link;
3474 if (blocked_queue_tl == t) {
3475 blocked_queue_tl = prev;
3478 #if defined(mingw32_HOST_OS)
3479 /* (Cooperatively) signal that the worker thread should abort
3482 abandonWorkRequest(tso->block_info.async_result->reqID);
3487 barf("unblockThread (I/O): TSO not found");
3490 case BlockedOnDelay:
3492 StgTSO *prev = NULL;
3493 for (t = sleeping_queue; t != END_TSO_QUEUE;
3494 prev = t, t = t->link) {
3497 sleeping_queue = t->link;
3499 prev->link = t->link;
3504 barf("unblockThread (delay): TSO not found");
3509 barf("unblockThread");
3513 tso->link = END_TSO_QUEUE;
3514 tso->why_blocked = NotBlocked;
3515 tso->block_info.closure = NULL;
3516 appendToRunQueue(cap,tso);
3520 /* -----------------------------------------------------------------------------
3523 * Check the blackhole_queue for threads that can be woken up. We do
3524 * this periodically: before every GC, and whenever the run queue is
3527 * An elegant solution might be to just wake up all the blocked
3528 * threads with awakenBlockedQueue occasionally: they'll go back to
3529 * sleep again if the object is still a BLACKHOLE. Unfortunately this
3530 * doesn't give us a way to tell whether we've actually managed to
3531 * wake up any threads, so we would be busy-waiting.
3533 * -------------------------------------------------------------------------- */
3536 checkBlackHoles (Capability *cap)
3539 rtsBool any_woke_up = rtsFalse;
3542 // blackhole_queue is global:
3543 ASSERT_LOCK_HELD(&sched_mutex);
3545 IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3547 // ASSUMES: sched_mutex
3548 prev = &blackhole_queue;
3549 t = blackhole_queue;
3550 while (t != END_TSO_QUEUE) {
3551 ASSERT(t->why_blocked == BlockedOnBlackHole);
3552 type = get_itbl(t->block_info.closure)->type;
3553 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3554 IF_DEBUG(sanity,checkTSO(t));
3555 t = unblockOne(cap, t);
3556 // urk, the threads migrate to the current capability
3557 // here, but we'd like to keep them on the original one.
3559 any_woke_up = rtsTrue;
3569 /* -----------------------------------------------------------------------------
3572 * The following function implements the magic for raising an
3573 * asynchronous exception in an existing thread.
3575 * We first remove the thread from any queue on which it might be
3576 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3578 * We strip the stack down to the innermost CATCH_FRAME, building
3579 * thunks in the heap for all the active computations, so they can
3580 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3581 * an application of the handler to the exception, and push it on
3582 * the top of the stack.
3584 * How exactly do we save all the active computations? We create an
3585 * AP_STACK for every UpdateFrame on the stack. Entering one of these
3586 * AP_STACKs pushes everything from the corresponding update frame
3587 * upwards onto the stack. (Actually, it pushes everything up to the
3588 * next update frame plus a pointer to the next AP_STACK object.
3589 * Entering the next AP_STACK object pushes more onto the stack until we
3590 * reach the last AP_STACK object - at which point the stack should look
3591 * exactly as it did when we killed the TSO and we can continue
3592 * execution by entering the closure on top of the stack.
3594 * We can also kill a thread entirely - this happens if either (a) the
3595 * exception passed to raiseAsync is NULL, or (b) there's no
3596 * CATCH_FRAME on the stack. In either case, we strip the entire
3597 * stack and replace the thread with a zombie.
3599 * ToDo: in SMP mode, this function is only safe if either (a) we hold
3600 * all the Capabilities (eg. in GC), or (b) we own the Capability that
3601 * the TSO is currently blocked on or on the run queue of.
3603 * -------------------------------------------------------------------------- */
3606 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3608 raiseAsync_(cap, tso, exception, rtsFalse);
3612 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
3613 rtsBool stop_at_atomically)
3615 StgRetInfoTable *info;
3618 // Thread already dead?
3619 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3624 sched_belch("raising exception in thread %ld.", (long)tso->id));
3626 // Remove it from any blocking queues
3627 unblockThread(cap,tso);
3631 // The stack freezing code assumes there's a closure pointer on
3632 // the top of the stack, so we have to arrange that this is the case...
3634 if (sp[0] == (W_)&stg_enter_info) {
3638 sp[0] = (W_)&stg_dummy_ret_closure;
3644 // 1. Let the top of the stack be the "current closure"
3646 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3649 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3650 // current closure applied to the chunk of stack up to (but not
3651 // including) the update frame. This closure becomes the "current
3652 // closure". Go back to step 2.
3654 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3655 // top of the stack applied to the exception.
3657 // 5. If it's a STOP_FRAME, then kill the thread.
3659 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
3666 info = get_ret_itbl((StgClosure *)frame);
3668 while (info->i.type != UPDATE_FRAME
3669 && (info->i.type != CATCH_FRAME || exception == NULL)
3670 && info->i.type != STOP_FRAME
3671 && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3673 if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3674 // IF we find an ATOMICALLY_FRAME then we abort the
3675 // current transaction and propagate the exception. In
3676 // this case (unlike ordinary exceptions) we do not care
3677 // whether the transaction is valid or not because its
3678 // possible validity cannot have caused the exception
3679 // and will not be visible after the abort.
3681 debugBelch("Found atomically block delivering async exception\n"));
3682 stmAbortTransaction(tso -> trec);
3683 tso -> trec = stmGetEnclosingTRec(tso -> trec);
3685 frame += stack_frame_sizeW((StgClosure *)frame);
3686 info = get_ret_itbl((StgClosure *)frame);
3689 switch (info->i.type) {
3691 case ATOMICALLY_FRAME:
3692 ASSERT(stop_at_atomically);
3693 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3694 stmCondemnTransaction(tso -> trec);
3698 // R1 is not a register: the return convention for IO in
3699 // this case puts the return value on the stack, so we
3700 // need to set up the stack to return to the atomically
3701 // frame properly...
3702 tso->sp = frame - 2;
3703 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3704 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3706 tso->what_next = ThreadRunGHC;
3710 // If we find a CATCH_FRAME, and we've got an exception to raise,
3711 // then build the THUNK raise(exception), and leave it on
3712 // top of the CATCH_FRAME ready to enter.
3716 StgCatchFrame *cf = (StgCatchFrame *)frame;
3720 // we've got an exception to raise, so let's pass it to the
3721 // handler in this frame.
3723 raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3724 TICK_ALLOC_SE_THK(1,0);
3725 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3726 raise->payload[0] = exception;
3728 // throw away the stack from Sp up to the CATCH_FRAME.
3732 /* Ensure that async excpetions are blocked now, so we don't get
3733 * a surprise exception before we get around to executing the
3736 if (tso->blocked_exceptions == NULL) {
3737 tso->blocked_exceptions = END_TSO_QUEUE;
3740 /* Put the newly-built THUNK on top of the stack, ready to execute
3741 * when the thread restarts.
3744 sp[-1] = (W_)&stg_enter_info;
3746 tso->what_next = ThreadRunGHC;
3747 IF_DEBUG(sanity, checkTSO(tso));
3756 // First build an AP_STACK consisting of the stack chunk above the
3757 // current update frame, with the top word on the stack as the
3760 words = frame - sp - 1;
3761 ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3764 ap->fun = (StgClosure *)sp[0];
3766 for(i=0; i < (nat)words; ++i) {
3767 ap->payload[i] = (StgClosure *)*sp++;
3770 SET_HDR(ap,&stg_AP_STACK_info,
3771 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3772 TICK_ALLOC_UP_THK(words+1,0);
3775 debugBelch("sched: Updating ");
3776 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3777 debugBelch(" with ");
3778 printObj((StgClosure *)ap);
3781 // Replace the updatee with an indirection - happily
3782 // this will also wake up any threads currently
3783 // waiting on the result.
3785 // Warning: if we're in a loop, more than one update frame on
3786 // the stack may point to the same object. Be careful not to
3787 // overwrite an IND_OLDGEN in this case, because we'll screw
3788 // up the mutable lists. To be on the safe side, don't
3789 // overwrite any kind of indirection at all. See also
3790 // threadSqueezeStack in GC.c, where we have to make a similar
3793 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3794 // revert the black hole
3795 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3798 sp += sizeofW(StgUpdateFrame) - 1;
3799 sp[0] = (W_)ap; // push onto stack
3804 // We've stripped the entire stack, the thread is now dead.
3805 tso->what_next = ThreadKilled;
3806 tso->sp = frame + sizeofW(StgStopFrame);
3816 /* -----------------------------------------------------------------------------
3819 This is used for interruption (^C) and forking, and corresponds to
3820 raising an exception but without letting the thread catch the
3822 -------------------------------------------------------------------------- */
3825 deleteThread (Capability *cap, StgTSO *tso)
3827 if (tso->why_blocked != BlockedOnCCall &&
3828 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3829 raiseAsync(cap,tso,NULL);
3833 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3835 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3836 { // for forkProcess only:
3837 // delete thread without giving it a chance to catch the KillThread exception
3839 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3843 if (tso->why_blocked != BlockedOnCCall &&
3844 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3845 unblockThread(cap,tso);
3848 tso->what_next = ThreadKilled;
3852 /* -----------------------------------------------------------------------------
3853 raiseExceptionHelper
3855 This function is called by the raise# primitve, just so that we can
3856 move some of the tricky bits of raising an exception from C-- into
3857 C. Who knows, it might be a useful re-useable thing here too.
3858 -------------------------------------------------------------------------- */
3861 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3863 Capability *cap = regTableToCapability(reg);
3864 StgThunk *raise_closure = NULL;
3866 StgRetInfoTable *info;
3868 // This closure represents the expression 'raise# E' where E
3869 // is the exception raise. It is used to overwrite all the
3870 // thunks which are currently under evaluataion.
3874 // LDV profiling: stg_raise_info has THUNK as its closure
3875 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3876 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3877 // 1 does not cause any problem unless profiling is performed.
3878 // However, when LDV profiling goes on, we need to linearly scan
3879 // small object pool, where raise_closure is stored, so we should
3880 // use MIN_UPD_SIZE.
3882 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3883 // sizeofW(StgClosure)+1);
3887 // Walk up the stack, looking for the catch frame. On the way,
3888 // we update any closures pointed to from update frames with the
3889 // raise closure that we just built.
3893 info = get_ret_itbl((StgClosure *)p);
3894 next = p + stack_frame_sizeW((StgClosure *)p);
3895 switch (info->i.type) {
3898 // Only create raise_closure if we need to.
3899 if (raise_closure == NULL) {
3901 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3902 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3903 raise_closure->payload[0] = exception;
3905 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3909 case ATOMICALLY_FRAME:
3910 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3912 return ATOMICALLY_FRAME;
3918 case CATCH_STM_FRAME:
3919 IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3921 return CATCH_STM_FRAME;
3927 case CATCH_RETRY_FRAME:
3936 /* -----------------------------------------------------------------------------
3937 findRetryFrameHelper
3939 This function is called by the retry# primitive. It traverses the stack
3940 leaving tso->sp referring to the frame which should handle the retry.
3942 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3943 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3945 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3946 despite the similar implementation.
3948 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3949 not be created within memory transactions.
3950 -------------------------------------------------------------------------- */
3953 findRetryFrameHelper (StgTSO *tso)
3956 StgRetInfoTable *info;
3960 info = get_ret_itbl((StgClosure *)p);
3961 next = p + stack_frame_sizeW((StgClosure *)p);
3962 switch (info->i.type) {
3964 case ATOMICALLY_FRAME:
3965 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3967 return ATOMICALLY_FRAME;
3969 case CATCH_RETRY_FRAME:
3970 IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3972 return CATCH_RETRY_FRAME;
3974 case CATCH_STM_FRAME:
3976 ASSERT(info->i.type != CATCH_FRAME);
3977 ASSERT(info->i.type != STOP_FRAME);
3984 /* -----------------------------------------------------------------------------
3985 resurrectThreads is called after garbage collection on the list of
3986 threads found to be garbage. Each of these threads will be woken
3987 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3988 on an MVar, or NonTermination if the thread was blocked on a Black
3991 Locks: assumes we hold *all* the capabilities.
3992 -------------------------------------------------------------------------- */
3995 resurrectThreads (StgTSO *threads)
4000 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4001 next = tso->global_link;
4002 tso->global_link = all_threads;
4004 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4006 // Wake up the thread on the Capability it was last on for a
4007 // bound thread, or last_free_capability otherwise.
4009 cap = tso->bound->cap;
4011 cap = last_free_capability;
4014 switch (tso->why_blocked) {
4016 case BlockedOnException:
4017 /* Called by GC - sched_mutex lock is currently held. */
4018 raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4020 case BlockedOnBlackHole:
4021 raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4024 raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4027 /* This might happen if the thread was blocked on a black hole
4028 * belonging to a thread that we've just woken up (raiseAsync
4029 * can wake up threads, remember...).
4033 barf("resurrectThreads: thread blocked in a strange way");
4038 /* ----------------------------------------------------------------------------
4039 * Debugging: why is a thread blocked
4040 * [Also provides useful information when debugging threaded programs
4041 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4042 ------------------------------------------------------------------------- */
4046 printThreadBlockage(StgTSO *tso)
4048 switch (tso->why_blocked) {
4050 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4052 case BlockedOnWrite:
4053 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4055 #if defined(mingw32_HOST_OS)
4056 case BlockedOnDoProc:
4057 debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4060 case BlockedOnDelay:
4061 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4064 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4066 case BlockedOnException:
4067 debugBelch("is blocked on delivering an exception to thread %d",
4068 tso->block_info.tso->id);
4070 case BlockedOnBlackHole:
4071 debugBelch("is blocked on a black hole");
4074 debugBelch("is not blocked");
4076 #if defined(PARALLEL_HASKELL)
4078 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4079 tso->block_info.closure, info_type(tso->block_info.closure));
4081 case BlockedOnGA_NoSend:
4082 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4083 tso->block_info.closure, info_type(tso->block_info.closure));
4086 case BlockedOnCCall:
4087 debugBelch("is blocked on an external call");
4089 case BlockedOnCCall_NoUnblockExc:
4090 debugBelch("is blocked on an external call (exceptions were already blocked)");
4093 debugBelch("is blocked on an STM operation");
4096 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4097 tso->why_blocked, tso->id, tso);
4102 printThreadStatus(StgTSO *t)
4104 debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4106 void *label = lookupThreadLabel(t->id);
4107 if (label) debugBelch("[\"%s\"] ",(char *)label);
4109 if (t->what_next == ThreadRelocated) {
4110 debugBelch("has been relocated...\n");
4112 switch (t->what_next) {
4114 debugBelch("has been killed");
4116 case ThreadComplete:
4117 debugBelch("has completed");
4120 printThreadBlockage(t);
4127 printAllThreads(void)
4134 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4135 ullong_format_string(TIME_ON_PROC(CurrentProc),
4136 time_string, rtsFalse/*no commas!*/);
4138 debugBelch("all threads at [%s]:\n", time_string);
4139 # elif defined(PARALLEL_HASKELL)
4140 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4141 ullong_format_string(CURRENT_TIME,
4142 time_string, rtsFalse/*no commas!*/);
4144 debugBelch("all threads at [%s]:\n", time_string);
4146 debugBelch("all threads:\n");
4149 for (i = 0; i < n_capabilities; i++) {
4150 cap = &capabilities[i];
4151 debugBelch("threads on capability %d:\n", cap->no);
4152 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4153 printThreadStatus(t);
4157 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4158 if (t->why_blocked != NotBlocked) {
4159 printThreadStatus(t);
4161 if (t->what_next == ThreadRelocated) {
4164 next = t->global_link;
4171 printThreadQueue(StgTSO *t)
4174 for (; t != END_TSO_QUEUE; t = t->link) {
4175 printThreadStatus(t);
4178 debugBelch("%d threads on queue\n", i);
4182 Print a whole blocking queue attached to node (debugging only).
4184 # if defined(PARALLEL_HASKELL)
4186 print_bq (StgClosure *node)
4188 StgBlockingQueueElement *bqe;
4192 debugBelch("## BQ of closure %p (%s): ",
4193 node, info_type(node));
4195 /* should cover all closures that may have a blocking queue */
4196 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4197 get_itbl(node)->type == FETCH_ME_BQ ||
4198 get_itbl(node)->type == RBH ||
4199 get_itbl(node)->type == MVAR);
4201 ASSERT(node!=(StgClosure*)NULL); // sanity check
4203 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4207 Print a whole blocking queue starting with the element bqe.
4210 print_bqe (StgBlockingQueueElement *bqe)
4215 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4217 for (end = (bqe==END_BQ_QUEUE);
4218 !end; // iterate until bqe points to a CONSTR
4219 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
4220 bqe = end ? END_BQ_QUEUE : bqe->link) {
4221 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4222 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4223 /* types of closures that may appear in a blocking queue */
4224 ASSERT(get_itbl(bqe)->type == TSO ||
4225 get_itbl(bqe)->type == BLOCKED_FETCH ||
4226 get_itbl(bqe)->type == CONSTR);
4227 /* only BQs of an RBH end with an RBH_Save closure */
4228 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4230 switch (get_itbl(bqe)->type) {
4232 debugBelch(" TSO %u (%x),",
4233 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4236 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4237 ((StgBlockedFetch *)bqe)->node,
4238 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4239 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4240 ((StgBlockedFetch *)bqe)->ga.weight);
4243 debugBelch(" %s (IP %p),",
4244 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4245 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4246 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4247 "RBH_Save_?"), get_itbl(bqe));
4250 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4251 info_type((StgClosure *)bqe)); // , node, info_type(node));
4257 # elif defined(GRAN)
4259 print_bq (StgClosure *node)
4261 StgBlockingQueueElement *bqe;
4262 PEs node_loc, tso_loc;
4265 /* should cover all closures that may have a blocking queue */
4266 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4267 get_itbl(node)->type == FETCH_ME_BQ ||
4268 get_itbl(node)->type == RBH);
4270 ASSERT(node!=(StgClosure*)NULL); // sanity check
4271 node_loc = where_is(node);
4273 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4274 node, info_type(node), node_loc);
4277 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4279 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4280 !end; // iterate until bqe points to a CONSTR
4281 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4282 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4283 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4284 /* types of closures that may appear in a blocking queue */
4285 ASSERT(get_itbl(bqe)->type == TSO ||
4286 get_itbl(bqe)->type == CONSTR);
4287 /* only BQs of an RBH end with an RBH_Save closure */
4288 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4290 tso_loc = where_is((StgClosure *)bqe);
4291 switch (get_itbl(bqe)->type) {
4293 debugBelch(" TSO %d (%p) on [PE %d],",
4294 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4297 debugBelch(" %s (IP %p),",
4298 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4299 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4300 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4301 "RBH_Save_?"), get_itbl(bqe));
4304 barf("Unexpected closure type %s in blocking queue of %p (%s)",
4305 info_type((StgClosure *)bqe), node, info_type(node));
4313 #if defined(PARALLEL_HASKELL)
4320 for (i=0, tso=run_queue_hd;
4321 tso != END_TSO_QUEUE;
4322 i++, tso=tso->link) {
4331 sched_belch(char *s, ...)
4336 debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4337 #elif defined(PARALLEL_HASKELL)
4340 debugBelch("sched: ");