1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2005
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
24 #include "RtsSignals.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
34 #include "Proftimer.h"
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
47 #include "Capability.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
69 // Turn off inlining when debugging - it obfuscates things
72 # define STATIC_INLINE static
76 #define USED_WHEN_THREADED_RTS
77 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
79 #define USED_WHEN_THREADED_RTS STG_UNUSED
80 #define USED_WHEN_NON_THREADED_RTS
86 #define USED_WHEN_SMP STG_UNUSED
89 /* -----------------------------------------------------------------------------
91 * -------------------------------------------------------------------------- */
95 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
96 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
99 In GranSim we have a runnable and a blocked queue for each processor.
100 In order to minimise code changes new arrays run_queue_hds/tls
101 are created. run_queue_hd is then a short cut (macro) for
102 run_queue_hds[CurrentProc] (see GranSim.h).
105 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
106 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
107 StgTSO *ccalling_threadss[MAX_PROC];
108 /* We use the same global list of threads (all_threads) in GranSim as in
109 the std RTS (i.e. we are cheating). However, we don't use this list in
110 the GranSim specific code at the moment (so we are only potentially
115 #if !defined(THREADED_RTS)
116 // Blocked/sleeping thrads
117 StgTSO *blocked_queue_hd = NULL;
118 StgTSO *blocked_queue_tl = NULL;
119 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
122 /* Threads blocked on blackholes.
123 * LOCK: sched_mutex+capability, or all capabilities
125 StgTSO *blackhole_queue = NULL;
128 /* The blackhole_queue should be checked for threads to wake up. See
129 * Schedule.h for more thorough comment.
130 * LOCK: none (doesn't matter if we miss an update)
132 rtsBool blackholes_need_checking = rtsFalse;
134 /* Linked list of all threads.
135 * Used for detecting garbage collected threads.
136 * LOCK: sched_mutex+capability, or all capabilities
138 StgTSO *all_threads = NULL;
140 /* flag set by signal handler to precipitate a context switch
141 * LOCK: none (just an advisory flag)
143 int context_switch = 0;
145 /* flag that tracks whether we have done any execution in this time slice.
146 * LOCK: currently none, perhaps we should lock (but needs to be
147 * updated in the fast path of the scheduler).
149 nat recent_activity = ACTIVITY_YES;
151 /* if this flag is set as well, give up execution
152 * LOCK: none (changes once, from false->true)
154 rtsBool interrupted = rtsFalse;
156 /* Next thread ID to allocate.
159 static StgThreadID next_thread_id = 1;
161 /* The smallest stack size that makes any sense is:
162 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
163 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
164 * + 1 (the closure to enter)
166 * + 1 (spare slot req'd by stg_ap_v_ret)
168 * A thread with this stack will bomb immediately with a stack
169 * overflow, which will increase its stack size.
171 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
177 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
178 * exists - earlier gccs apparently didn't.
184 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
185 * in an MT setting, needed to signal that a worker thread shouldn't hang around
186 * in the scheduler when it is out of work.
188 rtsBool shutting_down_scheduler = rtsFalse;
191 * This mutex protects most of the global scheduler data in
192 * the THREADED_RTS and (inc. SMP) runtime.
194 #if defined(THREADED_RTS)
198 #if defined(PARALLEL_HASKELL)
200 rtsTime TimeOfLastYield;
201 rtsBool emitSchedule = rtsTrue;
204 /* -----------------------------------------------------------------------------
205 * static function prototypes
206 * -------------------------------------------------------------------------- */
208 static Capability *schedule (Capability *initialCapability, Task *task);
211 // These function all encapsulate parts of the scheduler loop, and are
212 // abstracted only to make the structure and control flow of the
213 // scheduler clearer.
215 static void schedulePreLoop (void);
217 static void schedulePushWork(Capability *cap, Task *task);
219 static void scheduleStartSignalHandlers (Capability *cap);
220 static void scheduleCheckBlockedThreads (Capability *cap);
221 static void scheduleCheckBlackHoles (Capability *cap);
222 static void scheduleDetectDeadlock (Capability *cap, Task *task);
224 static StgTSO *scheduleProcessEvent(rtsEvent *event);
226 #if defined(PARALLEL_HASKELL)
227 static StgTSO *scheduleSendPendingMessages(void);
228 static void scheduleActivateSpark(void);
229 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
231 #if defined(PAR) || defined(GRAN)
232 static void scheduleGranParReport(void);
234 static void schedulePostRunThread(void);
235 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
236 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
238 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
239 nat prev_what_next );
240 static void scheduleHandleThreadBlocked( StgTSO *t );
241 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
243 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
244 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
246 static void unblockThread(Capability *cap, StgTSO *tso);
247 static rtsBool checkBlackHoles(Capability *cap);
248 static void AllRoots(evac_fn evac);
250 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
252 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
253 rtsBool stop_at_atomically, StgPtr stop_here);
255 static void deleteThread (Capability *cap, StgTSO *tso);
256 static void deleteRunQueue (Capability *cap);
259 static void printThreadBlockage(StgTSO *tso);
260 static void printThreadStatus(StgTSO *tso);
261 void printThreadQueue(StgTSO *tso);
264 #if defined(PARALLEL_HASKELL)
265 StgTSO * createSparkThread(rtsSpark spark);
266 StgTSO * activateSpark (rtsSpark spark);
270 static char *whatNext_strs[] = {
280 /* -----------------------------------------------------------------------------
281 * Putting a thread on the run queue: different scheduling policies
282 * -------------------------------------------------------------------------- */
285 addToRunQueue( Capability *cap, StgTSO *t )
287 #if defined(PARALLEL_HASKELL)
288 if (RtsFlags.ParFlags.doFairScheduling) {
289 // this does round-robin scheduling; good for concurrency
290 appendToRunQueue(cap,t);
292 // this does unfair scheduling; good for parallelism
293 pushOnRunQueue(cap,t);
296 // this does round-robin scheduling; good for concurrency
297 appendToRunQueue(cap,t);
301 /* ---------------------------------------------------------------------------
302 Main scheduling loop.
304 We use round-robin scheduling, each thread returning to the
305 scheduler loop when one of these conditions is detected:
308 * timer expires (thread yields)
314 In a GranSim setup this loop iterates over the global event queue.
315 This revolves around the global event queue, which determines what
316 to do next. Therefore, it's more complicated than either the
317 concurrent or the parallel (GUM) setup.
320 GUM iterates over incoming messages.
321 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
322 and sends out a fish whenever it has nothing to do; in-between
323 doing the actual reductions (shared code below) it processes the
324 incoming messages and deals with delayed operations
325 (see PendingFetches).
326 This is not the ugliest code you could imagine, but it's bloody close.
328 ------------------------------------------------------------------------ */
331 schedule (Capability *initialCapability, Task *task)
335 StgThreadReturnCode ret;
338 #elif defined(PARALLEL_HASKELL)
341 rtsBool receivedFinish = rtsFalse;
343 nat tp_size, sp_size; // stats only
348 #if defined(THREADED_RTS)
349 rtsBool first = rtsTrue;
352 cap = initialCapability;
354 // Pre-condition: this task owns initialCapability.
355 // The sched_mutex is *NOT* held
356 // NB. on return, we still hold a capability.
359 sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
360 task, initialCapability);
365 // -----------------------------------------------------------
366 // Scheduler loop starts here:
368 #if defined(PARALLEL_HASKELL)
369 #define TERMINATION_CONDITION (!receivedFinish)
371 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
373 #define TERMINATION_CONDITION rtsTrue
376 while (TERMINATION_CONDITION) {
379 /* Choose the processor with the next event */
380 CurrentProc = event->proc;
381 CurrentTSO = event->tso;
384 #if defined(THREADED_RTS)
386 // don't yield the first time, we want a chance to run this
387 // thread for a bit, even if there are others banging at the
390 ASSERT_CAPABILITY_INVARIANTS(cap,task);
392 // Yield the capability to higher-priority tasks if necessary.
393 yieldCapability(&cap, task);
398 schedulePushWork(cap,task);
401 // Check whether we have re-entered the RTS from Haskell without
402 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
404 if (cap->in_haskell) {
405 errorBelch("schedule: re-entered unsafely.\n"
406 " Perhaps a 'foreign import unsafe' should be 'safe'?");
407 stg_exit(EXIT_FAILURE);
411 // Test for interruption. If interrupted==rtsTrue, then either
412 // we received a keyboard interrupt (^C), or the scheduler is
413 // trying to shut down all the tasks (shutting_down_scheduler) in
419 discardSparksCap(cap);
421 if (shutting_down_scheduler) {
422 IF_DEBUG(scheduler, sched_belch("shutting down"));
423 // If we are a worker, just exit. If we're a bound thread
424 // then we will exit below when we've removed our TSO from
426 if (task->tso == NULL && emptyRunQueue(cap)) {
430 IF_DEBUG(scheduler, sched_belch("interrupted"));
435 // If the run queue is empty, take a spark and turn it into a thread.
437 if (emptyRunQueue(cap)) {
439 spark = findSpark(cap);
442 sched_belch("turning spark of closure %p into a thread",
443 (StgClosure *)spark));
444 createSparkThread(cap,spark);
450 scheduleStartSignalHandlers(cap);
452 // Only check the black holes here if we've nothing else to do.
453 // During normal execution, the black hole list only gets checked
454 // at GC time, to avoid repeatedly traversing this possibly long
455 // list each time around the scheduler.
456 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
458 scheduleCheckBlockedThreads(cap);
460 scheduleDetectDeadlock(cap,task);
462 // Normally, the only way we can get here with no threads to
463 // run is if a keyboard interrupt received during
464 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
465 // Additionally, it is not fatal for the
466 // threaded RTS to reach here with no threads to run.
468 // win32: might be here due to awaitEvent() being abandoned
469 // as a result of a console event having been delivered.
470 if ( emptyRunQueue(cap) ) {
471 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
474 continue; // nothing to do
477 #if defined(PARALLEL_HASKELL)
478 scheduleSendPendingMessages();
479 if (emptyRunQueue(cap) && scheduleActivateSpark())
483 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
486 /* If we still have no work we need to send a FISH to get a spark
488 if (emptyRunQueue(cap)) {
489 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
490 ASSERT(rtsFalse); // should not happen at the moment
492 // from here: non-empty run queue.
493 // TODO: merge above case with this, only one call processMessages() !
494 if (PacketsWaiting()) { /* process incoming messages, if
495 any pending... only in else
496 because getRemoteWork waits for
498 receivedFinish = processMessages();
503 scheduleProcessEvent(event);
507 // Get a thread to run
509 t = popRunQueue(cap);
511 #if defined(GRAN) || defined(PAR)
512 scheduleGranParReport(); // some kind of debuging output
514 // Sanity check the thread we're about to run. This can be
515 // expensive if there is lots of thread switching going on...
516 IF_DEBUG(sanity,checkTSO(t));
519 #if defined(THREADED_RTS)
520 // Check whether we can run this thread in the current task.
521 // If not, we have to pass our capability to the right task.
523 Task *bound = t->bound;
528 sched_belch("### Running thread %d in bound thread",
530 // yes, the Haskell thread is bound to the current native thread
533 sched_belch("### thread %d bound to another OS thread",
535 // no, bound to a different Haskell thread: pass to that thread
536 pushOnRunQueue(cap,t);
540 // The thread we want to run is unbound.
543 sched_belch("### this OS thread cannot run thread %d", t->id));
544 // no, the current native thread is bound to a different
545 // Haskell thread, so pass it to any worker thread
546 pushOnRunQueue(cap,t);
553 cap->r.rCurrentTSO = t;
555 /* context switches are initiated by the timer signal, unless
556 * the user specified "context switch as often as possible", with
559 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
560 && !emptyThreadQueues(cap)) {
566 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
567 (long)t->id, whatNext_strs[t->what_next]));
569 #if defined(PROFILING)
570 startHeapProfTimer();
573 // ----------------------------------------------------------------------
574 // Run the current thread
576 prev_what_next = t->what_next;
578 errno = t->saved_errno;
579 cap->in_haskell = rtsTrue;
581 recent_activity = ACTIVITY_YES;
583 switch (prev_what_next) {
587 /* Thread already finished, return to scheduler. */
588 ret = ThreadFinished;
594 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
595 cap = regTableToCapability(r);
600 case ThreadInterpret:
601 cap = interpretBCO(cap);
606 barf("schedule: invalid what_next field");
609 cap->in_haskell = rtsFalse;
611 // The TSO might have moved, eg. if it re-entered the RTS and a GC
612 // happened. So find the new location:
613 t = cap->r.rCurrentTSO;
615 // We have run some Haskell code: there might be blackhole-blocked
616 // threads to wake up now.
617 // Lock-free test here should be ok, we're just setting a flag.
618 if ( blackhole_queue != END_TSO_QUEUE ) {
619 blackholes_need_checking = rtsTrue;
622 // And save the current errno in this thread.
623 // XXX: possibly bogus for SMP because this thread might already
624 // be running again, see code below.
625 t->saved_errno = errno;
628 // If ret is ThreadBlocked, and this Task is bound to the TSO that
629 // blocked, we are in limbo - the TSO is now owned by whatever it
630 // is blocked on, and may in fact already have been woken up,
631 // perhaps even on a different Capability. It may be the case
632 // that task->cap != cap. We better yield this Capability
633 // immediately and return to normaility.
634 if (ret == ThreadBlocked) {
636 debugBelch("--<< thread %d (%s) stopped: blocked\n",
637 t->id, whatNext_strs[t->what_next]));
642 ASSERT_CAPABILITY_INVARIANTS(cap,task);
644 // ----------------------------------------------------------------------
646 // Costs for the scheduler are assigned to CCS_SYSTEM
647 #if defined(PROFILING)
652 #if defined(THREADED_RTS)
653 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
654 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
655 IF_DEBUG(scheduler,debugBelch("sched: "););
658 schedulePostRunThread();
660 ready_to_gc = rtsFalse;
664 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
668 scheduleHandleStackOverflow(cap,task,t);
672 if (scheduleHandleYield(cap, t, prev_what_next)) {
673 // shortcut for switching between compiler/interpreter:
679 scheduleHandleThreadBlocked(t);
683 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
684 ASSERT_CAPABILITY_INVARIANTS(cap,task);
688 barf("schedule: invalid thread return code %d", (int)ret);
691 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
692 if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
693 } /* end of while() */
695 IF_PAR_DEBUG(verbose,
696 debugBelch("== Leaving schedule() after having received Finish\n"));
699 /* ----------------------------------------------------------------------------
700 * Setting up the scheduler loop
701 * ------------------------------------------------------------------------- */
704 schedulePreLoop(void)
707 /* set up first event to get things going */
708 /* ToDo: assign costs for system setup and init MainTSO ! */
709 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
711 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
714 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n",
716 G_TSO(CurrentTSO, 5));
718 if (RtsFlags.GranFlags.Light) {
719 /* Save current time; GranSim Light only */
720 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
725 /* -----------------------------------------------------------------------------
728 * Push work to other Capabilities if we have some.
729 * -------------------------------------------------------------------------- */
733 schedulePushWork(Capability *cap USED_WHEN_SMP,
734 Task *task USED_WHEN_SMP)
736 Capability *free_caps[n_capabilities], *cap0;
739 // Check whether we have more threads on our run queue, or sparks
740 // in our pool, that we could hand to another Capability.
741 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
742 && sparkPoolSizeCap(cap) < 2) {
746 // First grab as many free Capabilities as we can.
747 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
748 cap0 = &capabilities[i];
749 if (cap != cap0 && tryGrabCapability(cap0,task)) {
750 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
751 // it already has some work, we just grabbed it at
752 // the wrong moment. Or maybe it's deadlocked!
753 releaseCapability(cap0);
755 free_caps[n_free_caps++] = cap0;
760 // we now have n_free_caps free capabilities stashed in
761 // free_caps[]. Share our run queue equally with them. This is
762 // probably the simplest thing we could do; improvements we might
763 // want to do include:
765 // - giving high priority to moving relatively new threads, on
766 // the gournds that they haven't had time to build up a
767 // working set in the cache on this CPU/Capability.
769 // - giving low priority to moving long-lived threads
771 if (n_free_caps > 0) {
772 StgTSO *prev, *t, *next;
773 rtsBool pushed_to_all;
775 IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
778 pushed_to_all = rtsFalse;
780 if (cap->run_queue_hd != END_TSO_QUEUE) {
781 prev = cap->run_queue_hd;
783 prev->link = END_TSO_QUEUE;
784 for (; t != END_TSO_QUEUE; t = next) {
786 t->link = END_TSO_QUEUE;
787 if (t->what_next == ThreadRelocated
788 || t->bound == task) { // don't move my bound thread
791 } else if (i == n_free_caps) {
792 pushed_to_all = rtsTrue;
798 appendToRunQueue(free_caps[i],t);
799 if (t->bound) { t->bound->cap = free_caps[i]; }
803 cap->run_queue_tl = prev;
806 // If there are some free capabilities that we didn't push any
807 // threads to, then try to push a spark to each one.
808 if (!pushed_to_all) {
810 // i is the next free capability to push to
811 for (; i < n_free_caps; i++) {
812 if (emptySparkPoolCap(free_caps[i])) {
813 spark = findSpark(cap);
815 IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
816 newSpark(&(free_caps[i]->r), spark);
822 // release the capabilities
823 for (i = 0; i < n_free_caps; i++) {
824 task->cap = free_caps[i];
825 releaseCapability(free_caps[i]);
828 task->cap = cap; // reset to point to our Capability.
832 /* ----------------------------------------------------------------------------
833 * Start any pending signal handlers
834 * ------------------------------------------------------------------------- */
836 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
838 scheduleStartSignalHandlers(Capability *cap)
840 if (signals_pending()) { // safe outside the lock
841 startSignalHandlers(cap);
846 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
851 /* ----------------------------------------------------------------------------
852 * Check for blocked threads that can be woken up.
853 * ------------------------------------------------------------------------- */
856 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
858 #if !defined(THREADED_RTS)
860 // Check whether any waiting threads need to be woken up. If the
861 // run queue is empty, and there are no other tasks running, we
862 // can wait indefinitely for something to happen.
864 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
866 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
872 /* ----------------------------------------------------------------------------
873 * Check for threads blocked on BLACKHOLEs that can be woken up
874 * ------------------------------------------------------------------------- */
876 scheduleCheckBlackHoles (Capability *cap)
878 if ( blackholes_need_checking ) // check without the lock first
880 ACQUIRE_LOCK(&sched_mutex);
881 if ( blackholes_need_checking ) {
882 checkBlackHoles(cap);
883 blackholes_need_checking = rtsFalse;
885 RELEASE_LOCK(&sched_mutex);
889 /* ----------------------------------------------------------------------------
890 * Detect deadlock conditions and attempt to resolve them.
891 * ------------------------------------------------------------------------- */
894 scheduleDetectDeadlock (Capability *cap, Task *task)
897 #if defined(PARALLEL_HASKELL)
898 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
903 * Detect deadlock: when we have no threads to run, there are no
904 * threads blocked, waiting for I/O, or sleeping, and all the
905 * other tasks are waiting for work, we must have a deadlock of
908 if ( emptyThreadQueues(cap) )
910 #if defined(THREADED_RTS)
912 * In the threaded RTS, we only check for deadlock if there
913 * has been no activity in a complete timeslice. This means
914 * we won't eagerly start a full GC just because we don't have
915 * any threads to run currently.
917 if (recent_activity != ACTIVITY_INACTIVE) return;
920 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
922 // Garbage collection can release some new threads due to
923 // either (a) finalizers or (b) threads resurrected because
924 // they are unreachable and will therefore be sent an
925 // exception. Any threads thus released will be immediately
927 scheduleDoGC( cap, task, rtsTrue/*force major GC*/ );
928 recent_activity = ACTIVITY_DONE_GC;
930 if ( !emptyRunQueue(cap) ) return;
932 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
933 /* If we have user-installed signal handlers, then wait
934 * for signals to arrive rather then bombing out with a
937 if ( anyUserHandlers() ) {
939 sched_belch("still deadlocked, waiting for signals..."));
943 if (signals_pending()) {
944 startSignalHandlers(cap);
947 // either we have threads to run, or we were interrupted:
948 ASSERT(!emptyRunQueue(cap) || interrupted);
952 #if !defined(THREADED_RTS)
953 /* Probably a real deadlock. Send the current main thread the
954 * Deadlock exception.
957 switch (task->tso->why_blocked) {
959 case BlockedOnBlackHole:
960 case BlockedOnException:
962 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
965 barf("deadlock: main thread blocked in a strange way");
973 /* ----------------------------------------------------------------------------
974 * Process an event (GRAN only)
975 * ------------------------------------------------------------------------- */
979 scheduleProcessEvent(rtsEvent *event)
983 if (RtsFlags.GranFlags.Light)
984 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
986 /* adjust time based on time-stamp */
987 if (event->time > CurrentTime[CurrentProc] &&
988 event->evttype != ContinueThread)
989 CurrentTime[CurrentProc] = event->time;
991 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
992 if (!RtsFlags.GranFlags.Light)
995 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
997 /* main event dispatcher in GranSim */
998 switch (event->evttype) {
999 /* Should just be continuing execution */
1000 case ContinueThread:
1001 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1002 /* ToDo: check assertion
1003 ASSERT(run_queue_hd != (StgTSO*)NULL &&
1004 run_queue_hd != END_TSO_QUEUE);
1006 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1007 if (!RtsFlags.GranFlags.DoAsyncFetch &&
1008 procStatus[CurrentProc]==Fetching) {
1009 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1010 CurrentTSO->id, CurrentTSO, CurrentProc);
1013 /* Ignore ContinueThreads for completed threads */
1014 if (CurrentTSO->what_next == ThreadComplete) {
1015 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1016 CurrentTSO->id, CurrentTSO, CurrentProc);
1019 /* Ignore ContinueThreads for threads that are being migrated */
1020 if (PROCS(CurrentTSO)==Nowhere) {
1021 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1022 CurrentTSO->id, CurrentTSO, CurrentProc);
1025 /* The thread should be at the beginning of the run queue */
1026 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1027 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1028 CurrentTSO->id, CurrentTSO, CurrentProc);
1029 break; // run the thread anyway
1032 new_event(proc, proc, CurrentTime[proc],
1034 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1036 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1037 break; // now actually run the thread; DaH Qu'vam yImuHbej
1040 do_the_fetchnode(event);
1041 goto next_thread; /* handle next event in event queue */
1044 do_the_globalblock(event);
1045 goto next_thread; /* handle next event in event queue */
1048 do_the_fetchreply(event);
1049 goto next_thread; /* handle next event in event queue */
1051 case UnblockThread: /* Move from the blocked queue to the tail of */
1052 do_the_unblock(event);
1053 goto next_thread; /* handle next event in event queue */
1055 case ResumeThread: /* Move from the blocked queue to the tail of */
1056 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1057 event->tso->gran.blocktime +=
1058 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1059 do_the_startthread(event);
1060 goto next_thread; /* handle next event in event queue */
1063 do_the_startthread(event);
1064 goto next_thread; /* handle next event in event queue */
1067 do_the_movethread(event);
1068 goto next_thread; /* handle next event in event queue */
1071 do_the_movespark(event);
1072 goto next_thread; /* handle next event in event queue */
1075 do_the_findwork(event);
1076 goto next_thread; /* handle next event in event queue */
1079 barf("Illegal event type %u\n", event->evttype);
1082 /* This point was scheduler_loop in the old RTS */
1084 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1086 TimeOfLastEvent = CurrentTime[CurrentProc];
1087 TimeOfNextEvent = get_time_of_next_event();
1088 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1089 // CurrentTSO = ThreadQueueHd;
1091 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1094 if (RtsFlags.GranFlags.Light)
1095 GranSimLight_leave_system(event, &ActiveTSO);
1097 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1100 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1102 /* in a GranSim setup the TSO stays on the run queue */
1104 /* Take a thread from the run queue. */
1105 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1108 debugBelch("GRAN: About to run current thread, which is\n");
1111 context_switch = 0; // turned on via GranYield, checking events and time slice
1114 DumpGranEvent(GR_SCHEDULE, t));
1116 procStatus[CurrentProc] = Busy;
1120 /* ----------------------------------------------------------------------------
1121 * Send pending messages (PARALLEL_HASKELL only)
1122 * ------------------------------------------------------------------------- */
1124 #if defined(PARALLEL_HASKELL)
1126 scheduleSendPendingMessages(void)
1132 # if defined(PAR) // global Mem.Mgmt., omit for now
1133 if (PendingFetches != END_BF_QUEUE) {
1138 if (RtsFlags.ParFlags.BufferTime) {
1139 // if we use message buffering, we must send away all message
1140 // packets which have become too old...
1146 /* ----------------------------------------------------------------------------
1147 * Activate spark threads (PARALLEL_HASKELL only)
1148 * ------------------------------------------------------------------------- */
1150 #if defined(PARALLEL_HASKELL)
1152 scheduleActivateSpark(void)
1155 ASSERT(emptyRunQueue());
1156 /* We get here if the run queue is empty and want some work.
1157 We try to turn a spark into a thread, and add it to the run queue,
1158 from where it will be picked up in the next iteration of the scheduler
1162 /* :-[ no local threads => look out for local sparks */
1163 /* the spark pool for the current PE */
1164 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1165 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1166 pool->hd < pool->tl) {
1168 * ToDo: add GC code check that we really have enough heap afterwards!!
1170 * If we're here (no runnable threads) and we have pending
1171 * sparks, we must have a space problem. Get enough space
1172 * to turn one of those pending sparks into a
1176 spark = findSpark(rtsFalse); /* get a spark */
1177 if (spark != (rtsSpark) NULL) {
1178 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1179 IF_PAR_DEBUG(fish, // schedule,
1180 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1181 tso->id, tso, advisory_thread_count));
1183 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1184 IF_PAR_DEBUG(fish, // schedule,
1185 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1187 return rtsFalse; /* failed to generate a thread */
1188 } /* otherwise fall through & pick-up new tso */
1190 IF_PAR_DEBUG(fish, // schedule,
1191 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1192 spark_queue_len(pool)));
1193 return rtsFalse; /* failed to generate a thread */
1195 return rtsTrue; /* success in generating a thread */
1196 } else { /* no more threads permitted or pool empty */
1197 return rtsFalse; /* failed to generateThread */
1200 tso = NULL; // avoid compiler warning only
1201 return rtsFalse; /* dummy in non-PAR setup */
1204 #endif // PARALLEL_HASKELL
1206 /* ----------------------------------------------------------------------------
1207 * Get work from a remote node (PARALLEL_HASKELL only)
1208 * ------------------------------------------------------------------------- */
1210 #if defined(PARALLEL_HASKELL)
1212 scheduleGetRemoteWork(rtsBool *receivedFinish)
1214 ASSERT(emptyRunQueue());
1216 if (RtsFlags.ParFlags.BufferTime) {
1217 IF_PAR_DEBUG(verbose,
1218 debugBelch("...send all pending data,"));
1221 for (i=1; i<=nPEs; i++)
1222 sendImmediately(i); // send all messages away immediately
1226 //++EDEN++ idle() , i.e. send all buffers, wait for work
1227 // suppress fishing in EDEN... just look for incoming messages
1228 // (blocking receive)
1229 IF_PAR_DEBUG(verbose,
1230 debugBelch("...wait for incoming messages...\n"));
1231 *receivedFinish = processMessages(); // blocking receive...
1233 // and reenter scheduling loop after having received something
1234 // (return rtsFalse below)
1236 # else /* activate SPARKS machinery */
1237 /* We get here, if we have no work, tried to activate a local spark, but still
1238 have no work. We try to get a remote spark, by sending a FISH message.
1239 Thread migration should be added here, and triggered when a sequence of
1240 fishes returns without work. */
1241 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1243 /* =8-[ no local sparks => look for work on other PEs */
1245 * We really have absolutely no work. Send out a fish
1246 * (there may be some out there already), and wait for
1247 * something to arrive. We clearly can't run any threads
1248 * until a SCHEDULE or RESUME arrives, and so that's what
1249 * we're hoping to see. (Of course, we still have to
1250 * respond to other types of messages.)
1252 rtsTime now = msTime() /*CURRENT_TIME*/;
1253 IF_PAR_DEBUG(verbose,
1254 debugBelch("-- now=%ld\n", now));
1255 IF_PAR_DEBUG(fish, // verbose,
1256 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1257 (last_fish_arrived_at!=0 &&
1258 last_fish_arrived_at+delay > now)) {
1259 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1260 now, last_fish_arrived_at+delay,
1261 last_fish_arrived_at,
1265 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1266 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1267 if (last_fish_arrived_at==0 ||
1268 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1269 /* outstandingFishes is set in sendFish, processFish;
1270 avoid flooding system with fishes via delay */
1271 next_fish_to_send_at = 0;
1273 /* ToDo: this should be done in the main scheduling loop to avoid the
1274 busy wait here; not so bad if fish delay is very small */
1275 int iq = 0; // DEBUGGING -- HWL
1276 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1277 /* send a fish when ready, but process messages that arrive in the meantime */
1279 if (PacketsWaiting()) {
1281 *receivedFinish = processMessages();
1284 } while (!*receivedFinish || now<next_fish_to_send_at);
1285 // JB: This means the fish could become obsolete, if we receive
1286 // work. Better check for work again?
1287 // last line: while (!receivedFinish || !haveWork || now<...)
1288 // next line: if (receivedFinish || haveWork )
1290 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1291 return rtsFalse; // NB: this will leave scheduler loop
1292 // immediately after return!
1294 IF_PAR_DEBUG(fish, // verbose,
1295 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1299 // JB: IMHO, this should all be hidden inside sendFish(...)
1301 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1304 // Global statistics: count no. of fishes
1305 if (RtsFlags.ParFlags.ParStats.Global &&
1306 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1307 globalParStats.tot_fish_mess++;
1311 /* delayed fishes must have been sent by now! */
1312 next_fish_to_send_at = 0;
1315 *receivedFinish = processMessages();
1316 # endif /* SPARKS */
1319 /* NB: this function always returns rtsFalse, meaning the scheduler
1320 loop continues with the next iteration;
1322 return code means success in finding work; we enter this function
1323 if there is no local work, thus have to send a fish which takes
1324 time until it arrives with work; in the meantime we should process
1325 messages in the main loop;
1328 #endif // PARALLEL_HASKELL
1330 /* ----------------------------------------------------------------------------
1331 * PAR/GRAN: Report stats & debugging info(?)
1332 * ------------------------------------------------------------------------- */
1334 #if defined(PAR) || defined(GRAN)
1336 scheduleGranParReport(void)
1338 ASSERT(run_queue_hd != END_TSO_QUEUE);
1340 /* Take a thread from the run queue, if we have work */
1341 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1343 /* If this TSO has got its outport closed in the meantime,
1344 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1345 * It has to be marked as TH_DEAD for this purpose.
1346 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1348 JB: TODO: investigate wether state change field could be nuked
1349 entirely and replaced by the normal tso state (whatnext
1350 field). All we want to do is to kill tsos from outside.
1353 /* ToDo: write something to the log-file
1354 if (RTSflags.ParFlags.granSimStats && !sameThread)
1355 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1359 /* the spark pool for the current PE */
1360 pool = &(cap.r.rSparks); // cap = (old) MainCap
1363 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1364 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1367 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1368 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1370 if (RtsFlags.ParFlags.ParStats.Full &&
1371 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1372 (emitSchedule || // forced emit
1373 (t && LastTSO && t->id != LastTSO->id))) {
1375 we are running a different TSO, so write a schedule event to log file
1376 NB: If we use fair scheduling we also have to write a deschedule
1377 event for LastTSO; with unfair scheduling we know that the
1378 previous tso has blocked whenever we switch to another tso, so
1379 we don't need it in GUM for now
1381 IF_PAR_DEBUG(fish, // schedule,
1382 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1384 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1385 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1386 emitSchedule = rtsFalse;
1391 /* ----------------------------------------------------------------------------
1392 * After running a thread...
1393 * ------------------------------------------------------------------------- */
1396 schedulePostRunThread(void)
1399 /* HACK 675: if the last thread didn't yield, make sure to print a
1400 SCHEDULE event to the log file when StgRunning the next thread, even
1401 if it is the same one as before */
1403 TimeOfLastYield = CURRENT_TIME;
1406 /* some statistics gathering in the parallel case */
1408 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1412 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1413 globalGranStats.tot_heapover++;
1415 globalParStats.tot_heapover++;
1422 DumpGranEvent(GR_DESCHEDULE, t));
1423 globalGranStats.tot_stackover++;
1426 // DumpGranEvent(GR_DESCHEDULE, t);
1427 globalParStats.tot_stackover++;
1431 case ThreadYielding:
1434 DumpGranEvent(GR_DESCHEDULE, t));
1435 globalGranStats.tot_yields++;
1438 // DumpGranEvent(GR_DESCHEDULE, t);
1439 globalParStats.tot_yields++;
1446 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1447 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1448 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1449 if (t->block_info.closure!=(StgClosure*)NULL)
1450 print_bq(t->block_info.closure);
1453 // ??? needed; should emit block before
1455 DumpGranEvent(GR_DESCHEDULE, t));
1456 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1459 ASSERT(procStatus[CurrentProc]==Busy ||
1460 ((procStatus[CurrentProc]==Fetching) &&
1461 (t->block_info.closure!=(StgClosure*)NULL)));
1462 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1463 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1464 procStatus[CurrentProc]==Fetching))
1465 procStatus[CurrentProc] = Idle;
1468 //++PAR++ blockThread() writes the event (change?)
1472 case ThreadFinished:
1476 barf("parGlobalStats: unknown return code");
1482 /* -----------------------------------------------------------------------------
1483 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1484 * -------------------------------------------------------------------------- */
1487 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1489 // did the task ask for a large block?
1490 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1491 // if so, get one and push it on the front of the nursery.
1495 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1498 debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1499 (long)t->id, whatNext_strs[t->what_next], blocks));
1501 // don't do this if the nursery is (nearly) full, we'll GC first.
1502 if (cap->r.rCurrentNursery->link != NULL ||
1503 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1504 // if the nursery has only one block.
1507 bd = allocGroup( blocks );
1509 cap->r.rNursery->n_blocks += blocks;
1511 // link the new group into the list
1512 bd->link = cap->r.rCurrentNursery;
1513 bd->u.back = cap->r.rCurrentNursery->u.back;
1514 if (cap->r.rCurrentNursery->u.back != NULL) {
1515 cap->r.rCurrentNursery->u.back->link = bd;
1518 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1519 g0s0 == cap->r.rNursery);
1521 cap->r.rNursery->blocks = bd;
1523 cap->r.rCurrentNursery->u.back = bd;
1525 // initialise it as a nursery block. We initialise the
1526 // step, gen_no, and flags field of *every* sub-block in
1527 // this large block, because this is easier than making
1528 // sure that we always find the block head of a large
1529 // block whenever we call Bdescr() (eg. evacuate() and
1530 // isAlive() in the GC would both have to do this, at
1534 for (x = bd; x < bd + blocks; x++) {
1535 x->step = cap->r.rNursery;
1541 // This assert can be a killer if the app is doing lots
1542 // of large block allocations.
1543 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1545 // now update the nursery to point to the new block
1546 cap->r.rCurrentNursery = bd;
1548 // we might be unlucky and have another thread get on the
1549 // run queue before us and steal the large block, but in that
1550 // case the thread will just end up requesting another large
1552 pushOnRunQueue(cap,t);
1553 return rtsFalse; /* not actually GC'ing */
1558 debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1559 (long)t->id, whatNext_strs[t->what_next]));
1561 ASSERT(!is_on_queue(t,CurrentProc));
1562 #elif defined(PARALLEL_HASKELL)
1563 /* Currently we emit a DESCHEDULE event before GC in GUM.
1564 ToDo: either add separate event to distinguish SYSTEM time from rest
1565 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1566 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1567 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1568 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1569 emitSchedule = rtsTrue;
1573 pushOnRunQueue(cap,t);
1575 /* actual GC is done at the end of the while loop in schedule() */
1578 /* -----------------------------------------------------------------------------
1579 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1580 * -------------------------------------------------------------------------- */
1583 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1585 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1586 (long)t->id, whatNext_strs[t->what_next]));
1587 /* just adjust the stack for this thread, then pop it back
1591 /* enlarge the stack */
1592 StgTSO *new_t = threadStackOverflow(cap, t);
1594 /* The TSO attached to this Task may have moved, so update the
1597 if (task->tso == t) {
1600 pushOnRunQueue(cap,new_t);
1604 /* -----------------------------------------------------------------------------
1605 * Handle a thread that returned to the scheduler with ThreadYielding
1606 * -------------------------------------------------------------------------- */
1609 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1611 // Reset the context switch flag. We don't do this just before
1612 // running the thread, because that would mean we would lose ticks
1613 // during GC, which can lead to unfair scheduling (a thread hogs
1614 // the CPU because the tick always arrives during GC). This way
1615 // penalises threads that do a lot of allocation, but that seems
1616 // better than the alternative.
1619 /* put the thread back on the run queue. Then, if we're ready to
1620 * GC, check whether this is the last task to stop. If so, wake
1621 * up the GC thread. getThread will block during a GC until the
1625 if (t->what_next != prev_what_next) {
1626 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1627 (long)t->id, whatNext_strs[t->what_next]);
1629 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1630 (long)t->id, whatNext_strs[t->what_next]);
1635 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1637 ASSERT(t->link == END_TSO_QUEUE);
1639 // Shortcut if we're just switching evaluators: don't bother
1640 // doing stack squeezing (which can be expensive), just run the
1642 if (t->what_next != prev_what_next) {
1647 ASSERT(!is_on_queue(t,CurrentProc));
1650 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1651 checkThreadQsSanity(rtsTrue));
1655 addToRunQueue(cap,t);
1658 /* add a ContinueThread event to actually process the thread */
1659 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1661 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1663 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1670 /* -----------------------------------------------------------------------------
1671 * Handle a thread that returned to the scheduler with ThreadBlocked
1672 * -------------------------------------------------------------------------- */
1675 scheduleHandleThreadBlocked( StgTSO *t
1676 #if !defined(GRAN) && !defined(DEBUG)
1683 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1684 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1685 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1687 // ??? needed; should emit block before
1689 DumpGranEvent(GR_DESCHEDULE, t));
1690 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1693 ASSERT(procStatus[CurrentProc]==Busy ||
1694 ((procStatus[CurrentProc]==Fetching) &&
1695 (t->block_info.closure!=(StgClosure*)NULL)));
1696 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1697 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1698 procStatus[CurrentProc]==Fetching))
1699 procStatus[CurrentProc] = Idle;
1703 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1704 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1707 if (t->block_info.closure!=(StgClosure*)NULL)
1708 print_bq(t->block_info.closure));
1710 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1713 /* whatever we schedule next, we must log that schedule */
1714 emitSchedule = rtsTrue;
1718 // We don't need to do anything. The thread is blocked, and it
1719 // has tidied up its stack and placed itself on whatever queue
1720 // it needs to be on.
1723 ASSERT(t->why_blocked != NotBlocked);
1724 // This might not be true under SMP: we don't have
1725 // exclusive access to this TSO, so someone might have
1726 // woken it up by now. This actually happens: try
1727 // conc023 +RTS -N2.
1731 debugBelch("--<< thread %d (%s) stopped: ",
1732 t->id, whatNext_strs[t->what_next]);
1733 printThreadBlockage(t);
1736 /* Only for dumping event to log file
1737 ToDo: do I need this in GranSim, too?
1743 /* -----------------------------------------------------------------------------
1744 * Handle a thread that returned to the scheduler with ThreadFinished
1745 * -------------------------------------------------------------------------- */
1748 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1750 /* Need to check whether this was a main thread, and if so,
1751 * return with the return value.
1753 * We also end up here if the thread kills itself with an
1754 * uncaught exception, see Exception.cmm.
1756 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1757 t->id, whatNext_strs[t->what_next]));
1760 endThread(t, CurrentProc); // clean-up the thread
1761 #elif defined(PARALLEL_HASKELL)
1762 /* For now all are advisory -- HWL */
1763 //if(t->priority==AdvisoryPriority) ??
1764 advisory_thread_count--; // JB: Caution with this counter, buggy!
1767 if(t->dist.priority==RevalPriority)
1771 # if defined(EDENOLD)
1772 // the thread could still have an outport... (BUG)
1773 if (t->eden.outport != -1) {
1774 // delete the outport for the tso which has finished...
1775 IF_PAR_DEBUG(eden_ports,
1776 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1777 t->eden.outport, t->id));
1780 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1781 if (t->eden.epid != -1) {
1782 IF_PAR_DEBUG(eden_ports,
1783 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1784 t->id, t->eden.epid));
1785 removeTSOfromProcess(t);
1790 if (RtsFlags.ParFlags.ParStats.Full &&
1791 !RtsFlags.ParFlags.ParStats.Suppressed)
1792 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1794 // t->par only contains statistics: left out for now...
1796 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1797 t->id,t,t->par.sparkname));
1799 #endif // PARALLEL_HASKELL
1802 // Check whether the thread that just completed was a bound
1803 // thread, and if so return with the result.
1805 // There is an assumption here that all thread completion goes
1806 // through this point; we need to make sure that if a thread
1807 // ends up in the ThreadKilled state, that it stays on the run
1808 // queue so it can be dealt with here.
1813 if (t->bound != task) {
1814 #if !defined(THREADED_RTS)
1815 // Must be a bound thread that is not the topmost one. Leave
1816 // it on the run queue until the stack has unwound to the
1817 // point where we can deal with this. Leaving it on the run
1818 // queue also ensures that the garbage collector knows about
1819 // this thread and its return value (it gets dropped from the
1820 // all_threads list so there's no other way to find it).
1821 appendToRunQueue(cap,t);
1824 // this cannot happen in the threaded RTS, because a
1825 // bound thread can only be run by the appropriate Task.
1826 barf("finished bound thread that isn't mine");
1830 ASSERT(task->tso == t);
1832 if (t->what_next == ThreadComplete) {
1834 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1835 *(task->ret) = (StgClosure *)task->tso->sp[1];
1837 task->stat = Success;
1840 *(task->ret) = NULL;
1843 task->stat = Interrupted;
1845 task->stat = Killed;
1849 removeThreadLabel((StgWord)task->tso->id);
1851 return rtsTrue; // tells schedule() to return
1857 /* -----------------------------------------------------------------------------
1858 * Perform a heap census, if PROFILING
1859 * -------------------------------------------------------------------------- */
1862 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1864 #if defined(PROFILING)
1865 // When we have +RTS -i0 and we're heap profiling, do a census at
1866 // every GC. This lets us get repeatable runs for debugging.
1867 if (performHeapProfile ||
1868 (RtsFlags.ProfFlags.profileInterval==0 &&
1869 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1870 GarbageCollect(GetRoots, rtsTrue);
1872 performHeapProfile = rtsFalse;
1873 return rtsTrue; // true <=> we already GC'd
1879 /* -----------------------------------------------------------------------------
1880 * Perform a garbage collection if necessary
1881 * -------------------------------------------------------------------------- */
1884 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1888 static volatile StgWord waiting_for_gc;
1889 rtsBool was_waiting;
1894 // In order to GC, there must be no threads running Haskell code.
1895 // Therefore, the GC thread needs to hold *all* the capabilities,
1896 // and release them after the GC has completed.
1898 // This seems to be the simplest way: previous attempts involved
1899 // making all the threads with capabilities give up their
1900 // capabilities and sleep except for the *last* one, which
1901 // actually did the GC. But it's quite hard to arrange for all
1902 // the other tasks to sleep and stay asleep.
1905 was_waiting = cas(&waiting_for_gc, 0, 1);
1908 IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1909 yieldCapability(&cap,task);
1910 } while (waiting_for_gc);
1914 for (i=0; i < n_capabilities; i++) {
1915 IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1916 if (cap != &capabilities[i]) {
1917 Capability *pcap = &capabilities[i];
1918 // we better hope this task doesn't get migrated to
1919 // another Capability while we're waiting for this one.
1920 // It won't, because load balancing happens while we have
1921 // all the Capabilities, but even so it's a slightly
1922 // unsavoury invariant.
1925 waitForReturnCapability(&pcap, task);
1926 if (pcap != &capabilities[i]) {
1927 barf("scheduleDoGC: got the wrong capability");
1932 waiting_for_gc = rtsFalse;
1935 /* Kick any transactions which are invalid back to their
1936 * atomically frames. When next scheduled they will try to
1937 * commit, this commit will fail and they will retry.
1942 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1943 if (t->what_next == ThreadRelocated) {
1946 next = t->global_link;
1947 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1948 if (!stmValidateNestOfTransactions (t -> trec)) {
1949 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1951 // strip the stack back to the
1952 // ATOMICALLY_FRAME, aborting the (nested)
1953 // transaction, and saving the stack of any
1954 // partially-evaluated thunks on the heap.
1955 raiseAsync_(cap, t, NULL, rtsTrue, NULL);
1958 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1966 // so this happens periodically:
1967 scheduleCheckBlackHoles(cap);
1969 IF_DEBUG(scheduler, printAllThreads());
1971 /* everybody back, start the GC.
1972 * Could do it in this thread, or signal a condition var
1973 * to do it in another thread. Either way, we need to
1974 * broadcast on gc_pending_cond afterward.
1976 #if defined(THREADED_RTS)
1977 IF_DEBUG(scheduler,sched_belch("doing GC"));
1979 GarbageCollect(GetRoots, force_major);
1982 // release our stash of capabilities.
1983 for (i = 0; i < n_capabilities; i++) {
1984 if (cap != &capabilities[i]) {
1985 task->cap = &capabilities[i];
1986 releaseCapability(&capabilities[i]);
1993 /* add a ContinueThread event to continue execution of current thread */
1994 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1996 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1998 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2004 /* ---------------------------------------------------------------------------
2005 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2006 * used by Control.Concurrent for error checking.
2007 * ------------------------------------------------------------------------- */
2010 rtsSupportsBoundThreads(void)
2012 #if defined(THREADED_RTS)
2019 /* ---------------------------------------------------------------------------
2020 * isThreadBound(tso): check whether tso is bound to an OS thread.
2021 * ------------------------------------------------------------------------- */
2024 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
2026 #if defined(THREADED_RTS)
2027 return (tso->bound != NULL);
2032 /* ---------------------------------------------------------------------------
2033 * Singleton fork(). Do not copy any running threads.
2034 * ------------------------------------------------------------------------- */
2036 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2037 #define FORKPROCESS_PRIMOP_SUPPORTED
2040 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2042 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2045 forkProcess(HsStablePtr *entry
2046 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2051 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2057 IF_DEBUG(scheduler,sched_belch("forking!"));
2059 // ToDo: for SMP, we should probably acquire *all* the capabilities
2064 if (pid) { // parent
2066 // just return the pid
2072 // delete all threads
2073 cap->run_queue_hd = END_TSO_QUEUE;
2074 cap->run_queue_tl = END_TSO_QUEUE;
2076 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2079 // don't allow threads to catch the ThreadKilled exception
2080 deleteThreadImmediately(cap,t);
2083 // wipe the task list
2084 ACQUIRE_LOCK(&sched_mutex);
2085 for (task = all_tasks; task != NULL; task=task->all_link) {
2086 if (task != cap->running_task) discardTask(task);
2088 RELEASE_LOCK(&sched_mutex);
2090 #if defined(THREADED_RTS)
2091 // wipe our spare workers list.
2092 cap->spare_workers = NULL;
2095 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2096 rts_checkSchedStatus("forkProcess",cap);
2099 hs_exit(); // clean up and exit
2100 stg_exit(EXIT_SUCCESS);
2102 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2103 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2108 /* ---------------------------------------------------------------------------
2109 * Delete the threads on the run queue of the current capability.
2110 * ------------------------------------------------------------------------- */
2113 deleteRunQueue (Capability *cap)
2116 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2117 ASSERT(t->what_next != ThreadRelocated);
2119 deleteThread(cap, t);
2123 /* startThread and insertThread are now in GranSim.c -- HWL */
2126 /* -----------------------------------------------------------------------------
2127 Managing the suspended_ccalling_tasks list.
2128 Locks required: sched_mutex
2129 -------------------------------------------------------------------------- */
2132 suspendTask (Capability *cap, Task *task)
2134 ASSERT(task->next == NULL && task->prev == NULL);
2135 task->next = cap->suspended_ccalling_tasks;
2137 if (cap->suspended_ccalling_tasks) {
2138 cap->suspended_ccalling_tasks->prev = task;
2140 cap->suspended_ccalling_tasks = task;
2144 recoverSuspendedTask (Capability *cap, Task *task)
2147 task->prev->next = task->next;
2149 ASSERT(cap->suspended_ccalling_tasks == task);
2150 cap->suspended_ccalling_tasks = task->next;
2153 task->next->prev = task->prev;
2155 task->next = task->prev = NULL;
2158 /* ---------------------------------------------------------------------------
2159 * Suspending & resuming Haskell threads.
2161 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2162 * its capability before calling the C function. This allows another
2163 * task to pick up the capability and carry on running Haskell
2164 * threads. It also means that if the C call blocks, it won't lock
2167 * The Haskell thread making the C call is put to sleep for the
2168 * duration of the call, on the susepended_ccalling_threads queue. We
2169 * give out a token to the task, which it can use to resume the thread
2170 * on return from the C function.
2171 * ------------------------------------------------------------------------- */
2174 suspendThread (StgRegTable *reg)
2177 int saved_errno = errno;
2181 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2183 cap = regTableToCapability(reg);
2185 task = cap->running_task;
2186 tso = cap->r.rCurrentTSO;
2189 sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2191 // XXX this might not be necessary --SDM
2192 tso->what_next = ThreadRunGHC;
2194 threadPaused(cap,tso);
2196 if(tso->blocked_exceptions == NULL) {
2197 tso->why_blocked = BlockedOnCCall;
2198 tso->blocked_exceptions = END_TSO_QUEUE;
2200 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2203 // Hand back capability
2204 task->suspended_tso = tso;
2206 ACQUIRE_LOCK(&cap->lock);
2208 suspendTask(cap,task);
2209 cap->in_haskell = rtsFalse;
2210 releaseCapability_(cap);
2212 RELEASE_LOCK(&cap->lock);
2214 #if defined(THREADED_RTS)
2215 /* Preparing to leave the RTS, so ensure there's a native thread/task
2216 waiting to take over.
2218 IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2221 errno = saved_errno;
2226 resumeThread (void *task_)
2230 int saved_errno = errno;
2234 // Wait for permission to re-enter the RTS with the result.
2235 waitForReturnCapability(&cap,task);
2236 // we might be on a different capability now... but if so, our
2237 // entry on the suspended_ccalling_tasks list will also have been
2240 // Remove the thread from the suspended list
2241 recoverSuspendedTask(cap,task);
2243 tso = task->suspended_tso;
2244 task->suspended_tso = NULL;
2245 tso->link = END_TSO_QUEUE;
2246 IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2248 if (tso->why_blocked == BlockedOnCCall) {
2249 awakenBlockedQueue(cap,tso->blocked_exceptions);
2250 tso->blocked_exceptions = NULL;
2253 /* Reset blocking status */
2254 tso->why_blocked = NotBlocked;
2256 cap->r.rCurrentTSO = tso;
2257 cap->in_haskell = rtsTrue;
2258 errno = saved_errno;
2263 /* ---------------------------------------------------------------------------
2264 * Comparing Thread ids.
2266 * This is used from STG land in the implementation of the
2267 * instances of Eq/Ord for ThreadIds.
2268 * ------------------------------------------------------------------------ */
2271 cmp_thread(StgPtr tso1, StgPtr tso2)
2273 StgThreadID id1 = ((StgTSO *)tso1)->id;
2274 StgThreadID id2 = ((StgTSO *)tso2)->id;
2276 if (id1 < id2) return (-1);
2277 if (id1 > id2) return 1;
2281 /* ---------------------------------------------------------------------------
2282 * Fetching the ThreadID from an StgTSO.
2284 * This is used in the implementation of Show for ThreadIds.
2285 * ------------------------------------------------------------------------ */
2287 rts_getThreadId(StgPtr tso)
2289 return ((StgTSO *)tso)->id;
2294 labelThread(StgPtr tso, char *label)
2299 /* Caveat: Once set, you can only set the thread name to "" */
2300 len = strlen(label)+1;
2301 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2302 strncpy(buf,label,len);
2303 /* Update will free the old memory for us */
2304 updateThreadLabel(((StgTSO *)tso)->id,buf);
2308 /* ---------------------------------------------------------------------------
2309 Create a new thread.
2311 The new thread starts with the given stack size. Before the
2312 scheduler can run, however, this thread needs to have a closure
2313 (and possibly some arguments) pushed on its stack. See
2314 pushClosure() in Schedule.h.
2316 createGenThread() and createIOThread() (in SchedAPI.h) are
2317 convenient packaged versions of this function.
2319 currently pri (priority) is only used in a GRAN setup -- HWL
2320 ------------------------------------------------------------------------ */
2322 /* currently pri (priority) is only used in a GRAN setup -- HWL */
2324 createThread(nat size, StgInt pri)
2327 createThread(Capability *cap, nat size)
2333 /* sched_mutex is *not* required */
2335 /* First check whether we should create a thread at all */
2336 #if defined(PARALLEL_HASKELL)
2337 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2338 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2340 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2341 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2342 return END_TSO_QUEUE;
2348 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2351 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2353 /* catch ridiculously small stack sizes */
2354 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2355 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2358 stack_size = size - TSO_STRUCT_SIZEW;
2360 tso = (StgTSO *)allocateLocal(cap, size);
2361 TICK_ALLOC_TSO(stack_size, 0);
2363 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2365 SET_GRAN_HDR(tso, ThisPE);
2368 // Always start with the compiled code evaluator
2369 tso->what_next = ThreadRunGHC;
2371 tso->why_blocked = NotBlocked;
2372 tso->blocked_exceptions = NULL;
2374 tso->saved_errno = 0;
2377 tso->stack_size = stack_size;
2378 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
2380 tso->sp = (P_)&(tso->stack) + stack_size;
2382 tso->trec = NO_TREC;
2385 tso->prof.CCCS = CCS_MAIN;
2388 /* put a stop frame on the stack */
2389 tso->sp -= sizeofW(StgStopFrame);
2390 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2391 tso->link = END_TSO_QUEUE;
2395 /* uses more flexible routine in GranSim */
2396 insertThread(tso, CurrentProc);
2398 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2404 if (RtsFlags.GranFlags.GranSimStats.Full)
2405 DumpGranEvent(GR_START,tso);
2406 #elif defined(PARALLEL_HASKELL)
2407 if (RtsFlags.ParFlags.ParStats.Full)
2408 DumpGranEvent(GR_STARTQ,tso);
2409 /* HACk to avoid SCHEDULE
2413 /* Link the new thread on the global thread list.
2415 ACQUIRE_LOCK(&sched_mutex);
2416 tso->id = next_thread_id++; // while we have the mutex
2417 tso->global_link = all_threads;
2419 RELEASE_LOCK(&sched_mutex);
2422 tso->dist.priority = MandatoryPriority; //by default that is...
2426 tso->gran.pri = pri;
2428 tso->gran.magic = TSO_MAGIC; // debugging only
2430 tso->gran.sparkname = 0;
2431 tso->gran.startedat = CURRENT_TIME;
2432 tso->gran.exported = 0;
2433 tso->gran.basicblocks = 0;
2434 tso->gran.allocs = 0;
2435 tso->gran.exectime = 0;
2436 tso->gran.fetchtime = 0;
2437 tso->gran.fetchcount = 0;
2438 tso->gran.blocktime = 0;
2439 tso->gran.blockcount = 0;
2440 tso->gran.blockedat = 0;
2441 tso->gran.globalsparks = 0;
2442 tso->gran.localsparks = 0;
2443 if (RtsFlags.GranFlags.Light)
2444 tso->gran.clock = Now; /* local clock */
2446 tso->gran.clock = 0;
2448 IF_DEBUG(gran,printTSO(tso));
2449 #elif defined(PARALLEL_HASKELL)
2451 tso->par.magic = TSO_MAGIC; // debugging only
2453 tso->par.sparkname = 0;
2454 tso->par.startedat = CURRENT_TIME;
2455 tso->par.exported = 0;
2456 tso->par.basicblocks = 0;
2457 tso->par.allocs = 0;
2458 tso->par.exectime = 0;
2459 tso->par.fetchtime = 0;
2460 tso->par.fetchcount = 0;
2461 tso->par.blocktime = 0;
2462 tso->par.blockcount = 0;
2463 tso->par.blockedat = 0;
2464 tso->par.globalsparks = 0;
2465 tso->par.localsparks = 0;
2469 globalGranStats.tot_threads_created++;
2470 globalGranStats.threads_created_on_PE[CurrentProc]++;
2471 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2472 globalGranStats.tot_sq_probes++;
2473 #elif defined(PARALLEL_HASKELL)
2474 // collect parallel global statistics (currently done together with GC stats)
2475 if (RtsFlags.ParFlags.ParStats.Global &&
2476 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2477 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
2478 globalParStats.tot_threads_created++;
2484 sched_belch("==__ schedule: Created TSO %d (%p);",
2485 CurrentProc, tso, tso->id));
2486 #elif defined(PARALLEL_HASKELL)
2487 IF_PAR_DEBUG(verbose,
2488 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2489 (long)tso->id, tso, advisory_thread_count));
2491 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
2492 (long)tso->id, (long)tso->stack_size));
2499 all parallel thread creation calls should fall through the following routine.
2502 createThreadFromSpark(rtsSpark spark)
2504 ASSERT(spark != (rtsSpark)NULL);
2505 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2506 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
2508 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2509 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2510 return END_TSO_QUEUE;
2514 tso = createThread(RtsFlags.GcFlags.initialStkSize);
2515 if (tso==END_TSO_QUEUE)
2516 barf("createSparkThread: Cannot create TSO");
2518 tso->priority = AdvisoryPriority;
2520 pushClosure(tso,spark);
2522 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
2529 Turn a spark into a thread.
2530 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2534 activateSpark (rtsSpark spark)
2538 tso = createSparkThread(spark);
2539 if (RtsFlags.ParFlags.ParStats.Full) {
2540 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2541 IF_PAR_DEBUG(verbose,
2542 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2543 (StgClosure *)spark, info_type((StgClosure *)spark)));
2545 // ToDo: fwd info on local/global spark to thread -- HWL
2546 // tso->gran.exported = spark->exported;
2547 // tso->gran.locked = !spark->global;
2548 // tso->gran.sparkname = spark->name;
2554 /* ---------------------------------------------------------------------------
2557 * scheduleThread puts a thread on the end of the runnable queue.
2558 * This will usually be done immediately after a thread is created.
2559 * The caller of scheduleThread must create the thread using e.g.
2560 * createThread and push an appropriate closure
2561 * on this thread's stack before the scheduler is invoked.
2562 * ------------------------------------------------------------------------ */
2565 scheduleThread(Capability *cap, StgTSO *tso)
2567 // The thread goes at the *end* of the run-queue, to avoid possible
2568 // starvation of any threads already on the queue.
2569 appendToRunQueue(cap,tso);
2573 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2577 // We already created/initialised the Task
2578 task = cap->running_task;
2580 // This TSO is now a bound thread; make the Task and TSO
2581 // point to each other.
2586 task->stat = NoStatus;
2588 appendToRunQueue(cap,tso);
2590 IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2593 /* GranSim specific init */
2594 CurrentTSO = m->tso; // the TSO to run
2595 procStatus[MainProc] = Busy; // status of main PE
2596 CurrentProc = MainProc; // PE to run it on
2599 cap = schedule(cap,task);
2601 ASSERT(task->stat != NoStatus);
2602 ASSERT_CAPABILITY_INVARIANTS(cap,task);
2604 IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2608 /* ----------------------------------------------------------------------------
2610 * ------------------------------------------------------------------------- */
2612 #if defined(THREADED_RTS)
2614 workerStart(Task *task)
2618 // See startWorkerTask().
2619 ACQUIRE_LOCK(&task->lock);
2621 RELEASE_LOCK(&task->lock);
2623 // set the thread-local pointer to the Task:
2626 // schedule() runs without a lock.
2627 cap = schedule(cap,task);
2629 // On exit from schedule(), we have a Capability.
2630 releaseCapability(cap);
2635 /* ---------------------------------------------------------------------------
2638 * Initialise the scheduler. This resets all the queues - if the
2639 * queues contained any threads, they'll be garbage collected at the
2642 * ------------------------------------------------------------------------ */
2649 for (i=0; i<=MAX_PROC; i++) {
2650 run_queue_hds[i] = END_TSO_QUEUE;
2651 run_queue_tls[i] = END_TSO_QUEUE;
2652 blocked_queue_hds[i] = END_TSO_QUEUE;
2653 blocked_queue_tls[i] = END_TSO_QUEUE;
2654 ccalling_threadss[i] = END_TSO_QUEUE;
2655 blackhole_queue[i] = END_TSO_QUEUE;
2656 sleeping_queue = END_TSO_QUEUE;
2658 #elif !defined(THREADED_RTS)
2659 blocked_queue_hd = END_TSO_QUEUE;
2660 blocked_queue_tl = END_TSO_QUEUE;
2661 sleeping_queue = END_TSO_QUEUE;
2664 blackhole_queue = END_TSO_QUEUE;
2665 all_threads = END_TSO_QUEUE;
2670 RtsFlags.ConcFlags.ctxtSwitchTicks =
2671 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2673 #if defined(THREADED_RTS)
2674 /* Initialise the mutex and condition variables used by
2676 initMutex(&sched_mutex);
2679 ACQUIRE_LOCK(&sched_mutex);
2681 /* A capability holds the state a native thread needs in
2682 * order to execute STG code. At least one capability is
2683 * floating around (only SMP builds have more than one).
2689 #if defined(SMP) || defined(PARALLEL_HASKELL)
2695 * Eagerly start one worker to run each Capability, except for
2696 * Capability 0. The idea is that we're probably going to start a
2697 * bound thread on Capability 0 pretty soon, so we don't want a
2698 * worker task hogging it.
2703 for (i = 1; i < n_capabilities; i++) {
2704 cap = &capabilities[i];
2705 ACQUIRE_LOCK(&cap->lock);
2706 startWorkerTask(cap, workerStart);
2707 RELEASE_LOCK(&cap->lock);
2712 RELEASE_LOCK(&sched_mutex);
2716 exitScheduler( void )
2718 interrupted = rtsTrue;
2719 shutting_down_scheduler = rtsTrue;
2721 #if defined(THREADED_RTS)
2726 ACQUIRE_LOCK(&sched_mutex);
2727 task = newBoundTask();
2728 RELEASE_LOCK(&sched_mutex);
2730 for (i = 0; i < n_capabilities; i++) {
2731 shutdownCapability(&capabilities[i], task);
2733 boundTaskExiting(task);
2739 /* ---------------------------------------------------------------------------
2740 Where are the roots that we know about?
2742 - all the threads on the runnable queue
2743 - all the threads on the blocked queue
2744 - all the threads on the sleeping queue
2745 - all the thread currently executing a _ccall_GC
2746 - all the "main threads"
2748 ------------------------------------------------------------------------ */
2750 /* This has to be protected either by the scheduler monitor, or by the
2751 garbage collection monitor (probably the latter).
2756 GetRoots( evac_fn evac )
2763 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2764 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2765 evac((StgClosure **)&run_queue_hds[i]);
2766 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2767 evac((StgClosure **)&run_queue_tls[i]);
2769 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2770 evac((StgClosure **)&blocked_queue_hds[i]);
2771 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2772 evac((StgClosure **)&blocked_queue_tls[i]);
2773 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2774 evac((StgClosure **)&ccalling_threads[i]);
2781 for (i = 0; i < n_capabilities; i++) {
2782 cap = &capabilities[i];
2783 evac((StgClosure **)&cap->run_queue_hd);
2784 evac((StgClosure **)&cap->run_queue_tl);
2786 for (task = cap->suspended_ccalling_tasks; task != NULL;
2788 evac((StgClosure **)&task->suspended_tso);
2792 #if !defined(THREADED_RTS)
2793 evac((StgClosure **)&blocked_queue_hd);
2794 evac((StgClosure **)&blocked_queue_tl);
2795 evac((StgClosure **)&sleeping_queue);
2799 evac((StgClosure **)&blackhole_queue);
2801 #if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN)
2802 markSparkQueue(evac);
2805 #if defined(RTS_USER_SIGNALS)
2806 // mark the signal handlers (signals should be already blocked)
2807 markSignalHandlers(evac);
2811 /* -----------------------------------------------------------------------------
2814 This is the interface to the garbage collector from Haskell land.
2815 We provide this so that external C code can allocate and garbage
2816 collect when called from Haskell via _ccall_GC.
2818 It might be useful to provide an interface whereby the programmer
2819 can specify more roots (ToDo).
2821 This needs to be protected by the GC condition variable above. KH.
2822 -------------------------------------------------------------------------- */
2824 static void (*extra_roots)(evac_fn);
2830 // ToDo: we have to grab all the capabilities here.
2831 errorBelch("performGC not supported in threaded RTS (yet)");
2832 stg_exit(EXIT_FAILURE);
2834 /* Obligated to hold this lock upon entry */
2835 GarbageCollect(GetRoots,rtsFalse);
2839 performMajorGC(void)
2842 errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2843 stg_exit(EXIT_FAILURE);
2845 GarbageCollect(GetRoots,rtsTrue);
2849 AllRoots(evac_fn evac)
2851 GetRoots(evac); // the scheduler's roots
2852 extra_roots(evac); // the user's roots
2856 performGCWithRoots(void (*get_roots)(evac_fn))
2859 errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2860 stg_exit(EXIT_FAILURE);
2862 extra_roots = get_roots;
2863 GarbageCollect(AllRoots,rtsFalse);
2866 /* -----------------------------------------------------------------------------
2869 If the thread has reached its maximum stack size, then raise the
2870 StackOverflow exception in the offending thread. Otherwise
2871 relocate the TSO into a larger chunk of memory and adjust its stack
2873 -------------------------------------------------------------------------- */
2876 threadStackOverflow(Capability *cap, StgTSO *tso)
2878 nat new_stack_size, stack_words;
2883 IF_DEBUG(sanity,checkTSO(tso));
2884 if (tso->stack_size >= tso->max_stack_size) {
2887 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2888 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2889 /* If we're debugging, just print out the top of the stack */
2890 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2893 /* Send this thread the StackOverflow exception */
2894 raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2898 /* Try to double the current stack size. If that takes us over the
2899 * maximum stack size for this thread, then use the maximum instead.
2900 * Finally round up so the TSO ends up as a whole number of blocks.
2902 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2903 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2904 TSO_STRUCT_SIZE)/sizeof(W_);
2905 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2906 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2908 IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2910 dest = (StgTSO *)allocate(new_tso_size);
2911 TICK_ALLOC_TSO(new_stack_size,0);
2913 /* copy the TSO block and the old stack into the new area */
2914 memcpy(dest,tso,TSO_STRUCT_SIZE);
2915 stack_words = tso->stack + tso->stack_size - tso->sp;
2916 new_sp = (P_)dest + new_tso_size - stack_words;
2917 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2919 /* relocate the stack pointers... */
2921 dest->stack_size = new_stack_size;
2923 /* Mark the old TSO as relocated. We have to check for relocated
2924 * TSOs in the garbage collector and any primops that deal with TSOs.
2926 * It's important to set the sp value to just beyond the end
2927 * of the stack, so we don't attempt to scavenge any part of the
2930 tso->what_next = ThreadRelocated;
2932 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2933 tso->why_blocked = NotBlocked;
2935 IF_PAR_DEBUG(verbose,
2936 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2937 tso->id, tso, tso->stack_size);
2938 /* If we're debugging, just print out the top of the stack */
2939 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2942 IF_DEBUG(sanity,checkTSO(tso));
2944 IF_DEBUG(scheduler,printTSO(dest));
2950 /* ---------------------------------------------------------------------------
2951 Wake up a queue that was blocked on some resource.
2952 ------------------------------------------------------------------------ */
2956 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2959 #elif defined(PARALLEL_HASKELL)
2961 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2963 /* write RESUME events to log file and
2964 update blocked and fetch time (depending on type of the orig closure) */
2965 if (RtsFlags.ParFlags.ParStats.Full) {
2966 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2967 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2968 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2969 if (emptyRunQueue())
2970 emitSchedule = rtsTrue;
2972 switch (get_itbl(node)->type) {
2974 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2979 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2986 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2993 StgBlockingQueueElement *
2994 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2997 PEs node_loc, tso_loc;
2999 node_loc = where_is(node); // should be lifted out of loop
3000 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3001 tso_loc = where_is((StgClosure *)tso);
3002 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3003 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3004 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3005 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3006 // insertThread(tso, node_loc);
3007 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3009 tso, node, (rtsSpark*)NULL);
3010 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3013 } else { // TSO is remote (actually should be FMBQ)
3014 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3015 RtsFlags.GranFlags.Costs.gunblocktime +
3016 RtsFlags.GranFlags.Costs.latency;
3017 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3019 tso, node, (rtsSpark*)NULL);
3020 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3023 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3025 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3026 (node_loc==tso_loc ? "Local" : "Global"),
3027 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3028 tso->block_info.closure = NULL;
3029 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
3032 #elif defined(PARALLEL_HASKELL)
3033 StgBlockingQueueElement *
3034 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3036 StgBlockingQueueElement *next;
3038 switch (get_itbl(bqe)->type) {
3040 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3041 /* if it's a TSO just push it onto the run_queue */
3043 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3044 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
3046 unblockCount(bqe, node);
3047 /* reset blocking status after dumping event */
3048 ((StgTSO *)bqe)->why_blocked = NotBlocked;
3052 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3054 bqe->link = (StgBlockingQueueElement *)PendingFetches;
3055 PendingFetches = (StgBlockedFetch *)bqe;
3059 /* can ignore this case in a non-debugging setup;
3060 see comments on RBHSave closures above */
3062 /* check that the closure is an RBHSave closure */
3063 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3064 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3065 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3069 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3070 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
3074 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3080 unblockOne(Capability *cap, StgTSO *tso)
3084 ASSERT(get_itbl(tso)->type == TSO);
3085 ASSERT(tso->why_blocked != NotBlocked);
3086 tso->why_blocked = NotBlocked;
3088 tso->link = END_TSO_QUEUE;
3090 // We might have just migrated this TSO to our Capability:
3092 tso->bound->cap = cap;
3095 appendToRunQueue(cap,tso);
3097 // we're holding a newly woken thread, make sure we context switch
3098 // quickly so we can migrate it if necessary.
3100 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3107 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3109 StgBlockingQueueElement *bqe;
3114 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3115 node, CurrentProc, CurrentTime[CurrentProc],
3116 CurrentTSO->id, CurrentTSO));
3118 node_loc = where_is(node);
3120 ASSERT(q == END_BQ_QUEUE ||
3121 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
3122 get_itbl(q)->type == CONSTR); // closure (type constructor)
3123 ASSERT(is_unique(node));
3125 /* FAKE FETCH: magically copy the node to the tso's proc;
3126 no Fetch necessary because in reality the node should not have been
3127 moved to the other PE in the first place
3129 if (CurrentProc!=node_loc) {
3131 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3132 node, node_loc, CurrentProc, CurrentTSO->id,
3133 // CurrentTSO, where_is(CurrentTSO),
3134 node->header.gran.procs));
3135 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3137 debugBelch("## new bitmask of node %p is %#x\n",
3138 node, node->header.gran.procs));
3139 if (RtsFlags.GranFlags.GranSimStats.Global) {
3140 globalGranStats.tot_fake_fetches++;
3145 // ToDo: check: ASSERT(CurrentProc==node_loc);
3146 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3149 bqe points to the current element in the queue
3150 next points to the next element in the queue
3152 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3153 //tso_loc = where_is(tso);
3155 bqe = unblockOne(bqe, node);
3158 /* if this is the BQ of an RBH, we have to put back the info ripped out of
3159 the closure to make room for the anchor of the BQ */
3160 if (bqe!=END_BQ_QUEUE) {
3161 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3163 ASSERT((info_ptr==&RBH_Save_0_info) ||
3164 (info_ptr==&RBH_Save_1_info) ||
3165 (info_ptr==&RBH_Save_2_info));
3167 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3168 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3169 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3172 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3173 node, info_type(node)));
3176 /* statistics gathering */
3177 if (RtsFlags.GranFlags.GranSimStats.Global) {
3178 // globalGranStats.tot_bq_processing_time += bq_processing_time;
3179 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
3180 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
3181 globalGranStats.tot_awbq++; // total no. of bqs awakened
3184 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3185 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3187 #elif defined(PARALLEL_HASKELL)
3189 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3191 StgBlockingQueueElement *bqe;
3193 IF_PAR_DEBUG(verbose,
3194 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3198 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3199 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3204 ASSERT(q == END_BQ_QUEUE ||
3205 get_itbl(q)->type == TSO ||
3206 get_itbl(q)->type == BLOCKED_FETCH ||
3207 get_itbl(q)->type == CONSTR);
3210 while (get_itbl(bqe)->type==TSO ||
3211 get_itbl(bqe)->type==BLOCKED_FETCH) {
3212 bqe = unblockOne(bqe, node);
3216 #else /* !GRAN && !PARALLEL_HASKELL */
3219 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3221 if (tso == NULL) return; // hack; see bug #1235728, and comments in
3223 while (tso != END_TSO_QUEUE) {
3224 tso = unblockOne(cap,tso);
3229 /* ---------------------------------------------------------------------------
3231 - usually called inside a signal handler so it mustn't do anything fancy.
3232 ------------------------------------------------------------------------ */
3235 interruptStgRts(void)
3239 #if defined(THREADED_RTS)
3240 prodAllCapabilities();
3244 /* -----------------------------------------------------------------------------
3247 This is for use when we raise an exception in another thread, which
3249 This has nothing to do with the UnblockThread event in GranSim. -- HWL
3250 -------------------------------------------------------------------------- */
3252 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3254 NB: only the type of the blocking queue is different in GranSim and GUM
3255 the operations on the queue-elements are the same
3256 long live polymorphism!
3258 Locks: sched_mutex is held upon entry and exit.
3262 unblockThread(Capability *cap, StgTSO *tso)
3264 StgBlockingQueueElement *t, **last;
3266 switch (tso->why_blocked) {
3269 return; /* not blocked */
3272 // Be careful: nothing to do here! We tell the scheduler that the thread
3273 // is runnable and we leave it to the stack-walking code to abort the
3274 // transaction while unwinding the stack. We should perhaps have a debugging
3275 // test to make sure that this really happens and that the 'zombie' transaction
3276 // does not get committed.
3280 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3282 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3283 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3285 last = (StgBlockingQueueElement **)&mvar->head;
3286 for (t = (StgBlockingQueueElement *)mvar->head;
3288 last = &t->link, last_tso = t, t = t->link) {
3289 if (t == (StgBlockingQueueElement *)tso) {
3290 *last = (StgBlockingQueueElement *)tso->link;
3291 if (mvar->tail == tso) {
3292 mvar->tail = (StgTSO *)last_tso;
3297 barf("unblockThread (MVAR): TSO not found");
3300 case BlockedOnBlackHole:
3301 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3303 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3305 last = &bq->blocking_queue;
3306 for (t = bq->blocking_queue;
3308 last = &t->link, t = t->link) {
3309 if (t == (StgBlockingQueueElement *)tso) {
3310 *last = (StgBlockingQueueElement *)tso->link;
3314 barf("unblockThread (BLACKHOLE): TSO not found");
3317 case BlockedOnException:
3319 StgTSO *target = tso->block_info.tso;
3321 ASSERT(get_itbl(target)->type == TSO);
3323 if (target->what_next == ThreadRelocated) {
3324 target = target->link;
3325 ASSERT(get_itbl(target)->type == TSO);
3328 ASSERT(target->blocked_exceptions != NULL);
3330 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3331 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
3333 last = &t->link, t = t->link) {
3334 ASSERT(get_itbl(t)->type == TSO);
3335 if (t == (StgBlockingQueueElement *)tso) {
3336 *last = (StgBlockingQueueElement *)tso->link;
3340 barf("unblockThread (Exception): TSO not found");
3344 case BlockedOnWrite:
3345 #if defined(mingw32_HOST_OS)
3346 case BlockedOnDoProc:
3349 /* take TSO off blocked_queue */
3350 StgBlockingQueueElement *prev = NULL;
3351 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
3352 prev = t, t = t->link) {
3353 if (t == (StgBlockingQueueElement *)tso) {
3355 blocked_queue_hd = (StgTSO *)t->link;
3356 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3357 blocked_queue_tl = END_TSO_QUEUE;
3360 prev->link = t->link;
3361 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3362 blocked_queue_tl = (StgTSO *)prev;
3365 #if defined(mingw32_HOST_OS)
3366 /* (Cooperatively) signal that the worker thread should abort
3369 abandonWorkRequest(tso->block_info.async_result->reqID);
3374 barf("unblockThread (I/O): TSO not found");
3377 case BlockedOnDelay:
3379 /* take TSO off sleeping_queue */
3380 StgBlockingQueueElement *prev = NULL;
3381 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
3382 prev = t, t = t->link) {
3383 if (t == (StgBlockingQueueElement *)tso) {
3385 sleeping_queue = (StgTSO *)t->link;
3387 prev->link = t->link;
3392 barf("unblockThread (delay): TSO not found");
3396 barf("unblockThread");
3400 tso->link = END_TSO_QUEUE;
3401 tso->why_blocked = NotBlocked;
3402 tso->block_info.closure = NULL;
3403 pushOnRunQueue(cap,tso);
3407 unblockThread(Capability *cap, StgTSO *tso)
3411 /* To avoid locking unnecessarily. */
3412 if (tso->why_blocked == NotBlocked) {
3416 switch (tso->why_blocked) {
3419 // Be careful: nothing to do here! We tell the scheduler that the thread
3420 // is runnable and we leave it to the stack-walking code to abort the
3421 // transaction while unwinding the stack. We should perhaps have a debugging
3422 // test to make sure that this really happens and that the 'zombie' transaction
3423 // does not get committed.
3427 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3429 StgTSO *last_tso = END_TSO_QUEUE;
3430 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3433 for (t = mvar->head; t != END_TSO_QUEUE;
3434 last = &t->link, last_tso = t, t = t->link) {
3437 if (mvar->tail == tso) {
3438 mvar->tail = last_tso;
3443 barf("unblockThread (MVAR): TSO not found");
3446 case BlockedOnBlackHole:
3448 last = &blackhole_queue;
3449 for (t = blackhole_queue; t != END_TSO_QUEUE;
3450 last = &t->link, t = t->link) {
3456 barf("unblockThread (BLACKHOLE): TSO not found");
3459 case BlockedOnException:
3461 StgTSO *target = tso->block_info.tso;
3463 ASSERT(get_itbl(target)->type == TSO);
3465 while (target->what_next == ThreadRelocated) {
3466 target = target->link;
3467 ASSERT(get_itbl(target)->type == TSO);
3470 ASSERT(target->blocked_exceptions != NULL);
3472 last = &target->blocked_exceptions;
3473 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
3474 last = &t->link, t = t->link) {
3475 ASSERT(get_itbl(t)->type == TSO);
3481 barf("unblockThread (Exception): TSO not found");
3484 #if !defined(THREADED_RTS)
3486 case BlockedOnWrite:
3487 #if defined(mingw32_HOST_OS)
3488 case BlockedOnDoProc:
3491 StgTSO *prev = NULL;
3492 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
3493 prev = t, t = t->link) {
3496 blocked_queue_hd = t->link;
3497 if (blocked_queue_tl == t) {
3498 blocked_queue_tl = END_TSO_QUEUE;
3501 prev->link = t->link;
3502 if (blocked_queue_tl == t) {
3503 blocked_queue_tl = prev;
3506 #if defined(mingw32_HOST_OS)
3507 /* (Cooperatively) signal that the worker thread should abort
3510 abandonWorkRequest(tso->block_info.async_result->reqID);
3515 barf("unblockThread (I/O): TSO not found");
3518 case BlockedOnDelay:
3520 StgTSO *prev = NULL;
3521 for (t = sleeping_queue; t != END_TSO_QUEUE;
3522 prev = t, t = t->link) {
3525 sleeping_queue = t->link;
3527 prev->link = t->link;
3532 barf("unblockThread (delay): TSO not found");
3537 barf("unblockThread");
3541 tso->link = END_TSO_QUEUE;
3542 tso->why_blocked = NotBlocked;
3543 tso->block_info.closure = NULL;
3544 appendToRunQueue(cap,tso);
3548 /* -----------------------------------------------------------------------------
3551 * Check the blackhole_queue for threads that can be woken up. We do
3552 * this periodically: before every GC, and whenever the run queue is
3555 * An elegant solution might be to just wake up all the blocked
3556 * threads with awakenBlockedQueue occasionally: they'll go back to
3557 * sleep again if the object is still a BLACKHOLE. Unfortunately this
3558 * doesn't give us a way to tell whether we've actually managed to
3559 * wake up any threads, so we would be busy-waiting.
3561 * -------------------------------------------------------------------------- */
3564 checkBlackHoles (Capability *cap)
3567 rtsBool any_woke_up = rtsFalse;
3570 // blackhole_queue is global:
3571 ASSERT_LOCK_HELD(&sched_mutex);
3573 IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3575 // ASSUMES: sched_mutex
3576 prev = &blackhole_queue;
3577 t = blackhole_queue;
3578 while (t != END_TSO_QUEUE) {
3579 ASSERT(t->why_blocked == BlockedOnBlackHole);
3580 type = get_itbl(t->block_info.closure)->type;
3581 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3582 IF_DEBUG(sanity,checkTSO(t));
3583 t = unblockOne(cap, t);
3584 // urk, the threads migrate to the current capability
3585 // here, but we'd like to keep them on the original one.
3587 any_woke_up = rtsTrue;
3597 /* -----------------------------------------------------------------------------
3600 * The following function implements the magic for raising an
3601 * asynchronous exception in an existing thread.
3603 * We first remove the thread from any queue on which it might be
3604 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3606 * We strip the stack down to the innermost CATCH_FRAME, building
3607 * thunks in the heap for all the active computations, so they can
3608 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3609 * an application of the handler to the exception, and push it on
3610 * the top of the stack.
3612 * How exactly do we save all the active computations? We create an
3613 * AP_STACK for every UpdateFrame on the stack. Entering one of these
3614 * AP_STACKs pushes everything from the corresponding update frame
3615 * upwards onto the stack. (Actually, it pushes everything up to the
3616 * next update frame plus a pointer to the next AP_STACK object.
3617 * Entering the next AP_STACK object pushes more onto the stack until we
3618 * reach the last AP_STACK object - at which point the stack should look
3619 * exactly as it did when we killed the TSO and we can continue
3620 * execution by entering the closure on top of the stack.
3622 * We can also kill a thread entirely - this happens if either (a) the
3623 * exception passed to raiseAsync is NULL, or (b) there's no
3624 * CATCH_FRAME on the stack. In either case, we strip the entire
3625 * stack and replace the thread with a zombie.
3627 * ToDo: in SMP mode, this function is only safe if either (a) we hold
3628 * all the Capabilities (eg. in GC), or (b) we own the Capability that
3629 * the TSO is currently blocked on or on the run queue of.
3631 * -------------------------------------------------------------------------- */
3634 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3636 raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3640 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3642 raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3646 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
3647 rtsBool stop_at_atomically, StgPtr stop_here)
3649 StgRetInfoTable *info;
3653 // Thread already dead?
3654 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3659 sched_belch("raising exception in thread %ld.", (long)tso->id));
3661 // Remove it from any blocking queues
3662 unblockThread(cap,tso);
3666 // The stack freezing code assumes there's a closure pointer on
3667 // the top of the stack, so we have to arrange that this is the case...
3669 if (sp[0] == (W_)&stg_enter_info) {
3673 sp[0] = (W_)&stg_dummy_ret_closure;
3677 while (stop_here == NULL || frame < stop_here) {
3679 // 1. Let the top of the stack be the "current closure"
3681 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3684 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3685 // current closure applied to the chunk of stack up to (but not
3686 // including) the update frame. This closure becomes the "current
3687 // closure". Go back to step 2.
3689 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3690 // top of the stack applied to the exception.
3692 // 5. If it's a STOP_FRAME, then kill the thread.
3694 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
3697 info = get_ret_itbl((StgClosure *)frame);
3699 switch (info->i.type) {
3706 // First build an AP_STACK consisting of the stack chunk above the
3707 // current update frame, with the top word on the stack as the
3710 words = frame - sp - 1;
3711 ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3714 ap->fun = (StgClosure *)sp[0];
3716 for(i=0; i < (nat)words; ++i) {
3717 ap->payload[i] = (StgClosure *)*sp++;
3720 SET_HDR(ap,&stg_AP_STACK_info,
3721 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3722 TICK_ALLOC_UP_THK(words+1,0);
3725 debugBelch("sched: Updating ");
3726 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3727 debugBelch(" with ");
3728 printObj((StgClosure *)ap);
3731 // Replace the updatee with an indirection
3733 // Warning: if we're in a loop, more than one update frame on
3734 // the stack may point to the same object. Be careful not to
3735 // overwrite an IND_OLDGEN in this case, because we'll screw
3736 // up the mutable lists. To be on the safe side, don't
3737 // overwrite any kind of indirection at all. See also
3738 // threadSqueezeStack in GC.c, where we have to make a similar
3741 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3742 // revert the black hole
3743 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3746 sp += sizeofW(StgUpdateFrame) - 1;
3747 sp[0] = (W_)ap; // push onto stack
3753 // We've stripped the entire stack, the thread is now dead.
3754 tso->what_next = ThreadKilled;
3755 tso->sp = frame + sizeofW(StgStopFrame);
3759 // If we find a CATCH_FRAME, and we've got an exception to raise,
3760 // then build the THUNK raise(exception), and leave it on
3761 // top of the CATCH_FRAME ready to enter.
3765 StgCatchFrame *cf = (StgCatchFrame *)frame;
3769 if (exception == NULL) break;
3771 // we've got an exception to raise, so let's pass it to the
3772 // handler in this frame.
3774 raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3775 TICK_ALLOC_SE_THK(1,0);
3776 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3777 raise->payload[0] = exception;
3779 // throw away the stack from Sp up to the CATCH_FRAME.
3783 /* Ensure that async excpetions are blocked now, so we don't get
3784 * a surprise exception before we get around to executing the
3787 if (tso->blocked_exceptions == NULL) {
3788 tso->blocked_exceptions = END_TSO_QUEUE;
3791 /* Put the newly-built THUNK on top of the stack, ready to execute
3792 * when the thread restarts.
3795 sp[-1] = (W_)&stg_enter_info;
3797 tso->what_next = ThreadRunGHC;
3798 IF_DEBUG(sanity, checkTSO(tso));
3802 case ATOMICALLY_FRAME:
3803 if (stop_at_atomically) {
3804 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3805 stmCondemnTransaction(cap, tso -> trec);
3809 // R1 is not a register: the return convention for IO in
3810 // this case puts the return value on the stack, so we
3811 // need to set up the stack to return to the atomically
3812 // frame properly...
3813 tso->sp = frame - 2;
3814 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3815 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3817 tso->what_next = ThreadRunGHC;
3820 // Not stop_at_atomically... fall through and abort the
3823 case CATCH_RETRY_FRAME:
3824 // IF we find an ATOMICALLY_FRAME then we abort the
3825 // current transaction and propagate the exception. In
3826 // this case (unlike ordinary exceptions) we do not care
3827 // whether the transaction is valid or not because its
3828 // possible validity cannot have caused the exception
3829 // and will not be visible after the abort.
3831 debugBelch("Found atomically block delivering async exception\n"));
3832 StgTRecHeader *trec = tso -> trec;
3833 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3834 stmAbortTransaction(cap, trec);
3835 tso -> trec = outer;
3842 // move on to the next stack frame
3843 frame += stack_frame_sizeW((StgClosure *)frame);
3846 // if we got here, then we stopped at stop_here
3847 ASSERT(stop_here != NULL);
3850 /* -----------------------------------------------------------------------------
3853 This is used for interruption (^C) and forking, and corresponds to
3854 raising an exception but without letting the thread catch the
3856 -------------------------------------------------------------------------- */
3859 deleteThread (Capability *cap, StgTSO *tso)
3861 if (tso->why_blocked != BlockedOnCCall &&
3862 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3863 raiseAsync(cap,tso,NULL);
3867 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3869 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3870 { // for forkProcess only:
3871 // delete thread without giving it a chance to catch the KillThread exception
3873 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3877 if (tso->why_blocked != BlockedOnCCall &&
3878 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3879 unblockThread(cap,tso);
3882 tso->what_next = ThreadKilled;
3886 /* -----------------------------------------------------------------------------
3887 raiseExceptionHelper
3889 This function is called by the raise# primitve, just so that we can
3890 move some of the tricky bits of raising an exception from C-- into
3891 C. Who knows, it might be a useful re-useable thing here too.
3892 -------------------------------------------------------------------------- */
3895 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3897 Capability *cap = regTableToCapability(reg);
3898 StgThunk *raise_closure = NULL;
3900 StgRetInfoTable *info;
3902 // This closure represents the expression 'raise# E' where E
3903 // is the exception raise. It is used to overwrite all the
3904 // thunks which are currently under evaluataion.
3908 // LDV profiling: stg_raise_info has THUNK as its closure
3909 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3910 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3911 // 1 does not cause any problem unless profiling is performed.
3912 // However, when LDV profiling goes on, we need to linearly scan
3913 // small object pool, where raise_closure is stored, so we should
3914 // use MIN_UPD_SIZE.
3916 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3917 // sizeofW(StgClosure)+1);
3921 // Walk up the stack, looking for the catch frame. On the way,
3922 // we update any closures pointed to from update frames with the
3923 // raise closure that we just built.
3927 info = get_ret_itbl((StgClosure *)p);
3928 next = p + stack_frame_sizeW((StgClosure *)p);
3929 switch (info->i.type) {
3932 // Only create raise_closure if we need to.
3933 if (raise_closure == NULL) {
3935 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3936 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3937 raise_closure->payload[0] = exception;
3939 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3943 case ATOMICALLY_FRAME:
3944 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3946 return ATOMICALLY_FRAME;
3952 case CATCH_STM_FRAME:
3953 IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3955 return CATCH_STM_FRAME;
3961 case CATCH_RETRY_FRAME:
3970 /* -----------------------------------------------------------------------------
3971 findRetryFrameHelper
3973 This function is called by the retry# primitive. It traverses the stack
3974 leaving tso->sp referring to the frame which should handle the retry.
3976 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3977 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3979 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3980 despite the similar implementation.
3982 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3983 not be created within memory transactions.
3984 -------------------------------------------------------------------------- */
3987 findRetryFrameHelper (StgTSO *tso)
3990 StgRetInfoTable *info;
3994 info = get_ret_itbl((StgClosure *)p);
3995 next = p + stack_frame_sizeW((StgClosure *)p);
3996 switch (info->i.type) {
3998 case ATOMICALLY_FRAME:
3999 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4001 return ATOMICALLY_FRAME;
4003 case CATCH_RETRY_FRAME:
4004 IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4006 return CATCH_RETRY_FRAME;
4008 case CATCH_STM_FRAME:
4010 ASSERT(info->i.type != CATCH_FRAME);
4011 ASSERT(info->i.type != STOP_FRAME);
4018 /* -----------------------------------------------------------------------------
4019 resurrectThreads is called after garbage collection on the list of
4020 threads found to be garbage. Each of these threads will be woken
4021 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4022 on an MVar, or NonTermination if the thread was blocked on a Black
4025 Locks: assumes we hold *all* the capabilities.
4026 -------------------------------------------------------------------------- */
4029 resurrectThreads (StgTSO *threads)
4034 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4035 next = tso->global_link;
4036 tso->global_link = all_threads;
4038 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4040 // Wake up the thread on the Capability it was last on for a
4041 // bound thread, or last_free_capability otherwise.
4043 cap = tso->bound->cap;
4045 cap = last_free_capability;
4048 switch (tso->why_blocked) {
4050 case BlockedOnException:
4051 /* Called by GC - sched_mutex lock is currently held. */
4052 raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4054 case BlockedOnBlackHole:
4055 raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4058 raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4061 /* This might happen if the thread was blocked on a black hole
4062 * belonging to a thread that we've just woken up (raiseAsync
4063 * can wake up threads, remember...).
4067 barf("resurrectThreads: thread blocked in a strange way");
4072 /* ----------------------------------------------------------------------------
4073 * Debugging: why is a thread blocked
4074 * [Also provides useful information when debugging threaded programs
4075 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4076 ------------------------------------------------------------------------- */
4080 printThreadBlockage(StgTSO *tso)
4082 switch (tso->why_blocked) {
4084 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4086 case BlockedOnWrite:
4087 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4089 #if defined(mingw32_HOST_OS)
4090 case BlockedOnDoProc:
4091 debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4094 case BlockedOnDelay:
4095 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4098 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4100 case BlockedOnException:
4101 debugBelch("is blocked on delivering an exception to thread %d",
4102 tso->block_info.tso->id);
4104 case BlockedOnBlackHole:
4105 debugBelch("is blocked on a black hole");
4108 debugBelch("is not blocked");
4110 #if defined(PARALLEL_HASKELL)
4112 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4113 tso->block_info.closure, info_type(tso->block_info.closure));
4115 case BlockedOnGA_NoSend:
4116 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4117 tso->block_info.closure, info_type(tso->block_info.closure));
4120 case BlockedOnCCall:
4121 debugBelch("is blocked on an external call");
4123 case BlockedOnCCall_NoUnblockExc:
4124 debugBelch("is blocked on an external call (exceptions were already blocked)");
4127 debugBelch("is blocked on an STM operation");
4130 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4131 tso->why_blocked, tso->id, tso);
4136 printThreadStatus(StgTSO *t)
4138 debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4140 void *label = lookupThreadLabel(t->id);
4141 if (label) debugBelch("[\"%s\"] ",(char *)label);
4143 if (t->what_next == ThreadRelocated) {
4144 debugBelch("has been relocated...\n");
4146 switch (t->what_next) {
4148 debugBelch("has been killed");
4150 case ThreadComplete:
4151 debugBelch("has completed");
4154 printThreadBlockage(t);
4161 printAllThreads(void)
4168 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4169 ullong_format_string(TIME_ON_PROC(CurrentProc),
4170 time_string, rtsFalse/*no commas!*/);
4172 debugBelch("all threads at [%s]:\n", time_string);
4173 # elif defined(PARALLEL_HASKELL)
4174 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4175 ullong_format_string(CURRENT_TIME,
4176 time_string, rtsFalse/*no commas!*/);
4178 debugBelch("all threads at [%s]:\n", time_string);
4180 debugBelch("all threads:\n");
4183 for (i = 0; i < n_capabilities; i++) {
4184 cap = &capabilities[i];
4185 debugBelch("threads on capability %d:\n", cap->no);
4186 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4187 printThreadStatus(t);
4191 debugBelch("other threads:\n");
4192 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4193 if (t->why_blocked != NotBlocked) {
4194 printThreadStatus(t);
4196 if (t->what_next == ThreadRelocated) {
4199 next = t->global_link;
4206 printThreadQueue(StgTSO *t)
4209 for (; t != END_TSO_QUEUE; t = t->link) {
4210 printThreadStatus(t);
4213 debugBelch("%d threads on queue\n", i);
4217 Print a whole blocking queue attached to node (debugging only).
4219 # if defined(PARALLEL_HASKELL)
4221 print_bq (StgClosure *node)
4223 StgBlockingQueueElement *bqe;
4227 debugBelch("## BQ of closure %p (%s): ",
4228 node, info_type(node));
4230 /* should cover all closures that may have a blocking queue */
4231 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4232 get_itbl(node)->type == FETCH_ME_BQ ||
4233 get_itbl(node)->type == RBH ||
4234 get_itbl(node)->type == MVAR);
4236 ASSERT(node!=(StgClosure*)NULL); // sanity check
4238 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4242 Print a whole blocking queue starting with the element bqe.
4245 print_bqe (StgBlockingQueueElement *bqe)
4250 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4252 for (end = (bqe==END_BQ_QUEUE);
4253 !end; // iterate until bqe points to a CONSTR
4254 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
4255 bqe = end ? END_BQ_QUEUE : bqe->link) {
4256 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4257 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4258 /* types of closures that may appear in a blocking queue */
4259 ASSERT(get_itbl(bqe)->type == TSO ||
4260 get_itbl(bqe)->type == BLOCKED_FETCH ||
4261 get_itbl(bqe)->type == CONSTR);
4262 /* only BQs of an RBH end with an RBH_Save closure */
4263 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4265 switch (get_itbl(bqe)->type) {
4267 debugBelch(" TSO %u (%x),",
4268 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4271 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4272 ((StgBlockedFetch *)bqe)->node,
4273 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4274 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4275 ((StgBlockedFetch *)bqe)->ga.weight);
4278 debugBelch(" %s (IP %p),",
4279 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4280 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4281 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4282 "RBH_Save_?"), get_itbl(bqe));
4285 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4286 info_type((StgClosure *)bqe)); // , node, info_type(node));
4292 # elif defined(GRAN)
4294 print_bq (StgClosure *node)
4296 StgBlockingQueueElement *bqe;
4297 PEs node_loc, tso_loc;
4300 /* should cover all closures that may have a blocking queue */
4301 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4302 get_itbl(node)->type == FETCH_ME_BQ ||
4303 get_itbl(node)->type == RBH);
4305 ASSERT(node!=(StgClosure*)NULL); // sanity check
4306 node_loc = where_is(node);
4308 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4309 node, info_type(node), node_loc);
4312 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4314 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4315 !end; // iterate until bqe points to a CONSTR
4316 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4317 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4318 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4319 /* types of closures that may appear in a blocking queue */
4320 ASSERT(get_itbl(bqe)->type == TSO ||
4321 get_itbl(bqe)->type == CONSTR);
4322 /* only BQs of an RBH end with an RBH_Save closure */
4323 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4325 tso_loc = where_is((StgClosure *)bqe);
4326 switch (get_itbl(bqe)->type) {
4328 debugBelch(" TSO %d (%p) on [PE %d],",
4329 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4332 debugBelch(" %s (IP %p),",
4333 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4334 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4335 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4336 "RBH_Save_?"), get_itbl(bqe));
4339 barf("Unexpected closure type %s in blocking queue of %p (%s)",
4340 info_type((StgClosure *)bqe), node, info_type(node));
4348 #if defined(PARALLEL_HASKELL)
4355 for (i=0, tso=run_queue_hd;
4356 tso != END_TSO_QUEUE;
4357 i++, tso=tso->link) {
4366 sched_belch(char *s, ...)
4371 debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4372 #elif defined(PARALLEL_HASKELL)
4375 debugBelch("sched: ");