1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2005
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
24 #include "RtsSignals.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
34 #include "Proftimer.h"
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
47 #include "Capability.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
69 // Turn off inlining when debugging - it obfuscates things
72 # define STATIC_INLINE static
75 /* -----------------------------------------------------------------------------
77 * -------------------------------------------------------------------------- */
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
85 In GranSim we have a runnable and a blocked queue for each processor.
86 In order to minimise code changes new arrays run_queue_hds/tls
87 are created. run_queue_hd is then a short cut (macro) for
88 run_queue_hds[CurrentProc] (see GranSim.h).
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95 the std RTS (i.e. we are cheating). However, we don't use this list in
96 the GranSim specific code at the moment (so we are only potentially
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
108 /* Threads blocked on blackholes.
109 * LOCK: sched_mutex+capability, or all capabilities
111 StgTSO *blackhole_queue = NULL;
114 /* The blackhole_queue should be checked for threads to wake up. See
115 * Schedule.h for more thorough comment.
116 * LOCK: none (doesn't matter if we miss an update)
118 rtsBool blackholes_need_checking = rtsFalse;
120 /* Linked list of all threads.
121 * Used for detecting garbage collected threads.
122 * LOCK: sched_mutex+capability, or all capabilities
124 StgTSO *all_threads = NULL;
126 /* flag set by signal handler to precipitate a context switch
127 * LOCK: none (just an advisory flag)
129 int context_switch = 0;
131 /* flag that tracks whether we have done any execution in this time slice.
132 * LOCK: currently none, perhaps we should lock (but needs to be
133 * updated in the fast path of the scheduler).
135 nat recent_activity = ACTIVITY_YES;
137 /* if this flag is set as well, give up execution
138 * LOCK: none (changes once, from false->true)
140 rtsBool interrupted = rtsFalse;
142 /* Next thread ID to allocate.
145 static StgThreadID next_thread_id = 1;
147 /* The smallest stack size that makes any sense is:
148 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
149 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
150 * + 1 (the closure to enter)
152 * + 1 (spare slot req'd by stg_ap_v_ret)
154 * A thread with this stack will bomb immediately with a stack
155 * overflow, which will increase its stack size.
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
163 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
164 * exists - earlier gccs apparently didn't.
170 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171 * in an MT setting, needed to signal that a worker thread shouldn't hang around
172 * in the scheduler when it is out of work.
174 rtsBool shutting_down_scheduler = rtsFalse;
177 * This mutex protects most of the global scheduler data in
178 * the THREADED_RTS and (inc. SMP) runtime.
180 #if defined(THREADED_RTS)
184 #if defined(PARALLEL_HASKELL)
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
190 /* -----------------------------------------------------------------------------
191 * static function prototypes
192 * -------------------------------------------------------------------------- */
194 static Capability *schedule (Capability *initialCapability, Task *task);
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
201 static void schedulePreLoop (void);
203 static void schedulePushWork(Capability *cap, Task *task);
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckBlackHoles (Capability *cap);
208 static void scheduleDetectDeadlock (Capability *cap, Task *task);
210 static StgTSO *scheduleProcessEvent(rtsEvent *event);
212 #if defined(PARALLEL_HASKELL)
213 static StgTSO *scheduleSendPendingMessages(void);
214 static void scheduleActivateSpark(void);
215 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
217 #if defined(PAR) || defined(GRAN)
218 static void scheduleGranParReport(void);
220 static void schedulePostRunThread(void);
221 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
222 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
224 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
225 nat prev_what_next );
226 static void scheduleHandleThreadBlocked( StgTSO *t );
227 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
229 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
230 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
232 static void unblockThread(Capability *cap, StgTSO *tso);
233 static rtsBool checkBlackHoles(Capability *cap);
234 static void AllRoots(evac_fn evac);
236 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
238 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
239 rtsBool stop_at_atomically, StgPtr stop_here);
241 static void deleteThread (Capability *cap, StgTSO *tso);
242 static void deleteRunQueue (Capability *cap);
245 static void printThreadBlockage(StgTSO *tso);
246 static void printThreadStatus(StgTSO *tso);
247 void printThreadQueue(StgTSO *tso);
250 #if defined(PARALLEL_HASKELL)
251 StgTSO * createSparkThread(rtsSpark spark);
252 StgTSO * activateSpark (rtsSpark spark);
256 static char *whatNext_strs[] = {
266 /* -----------------------------------------------------------------------------
267 * Putting a thread on the run queue: different scheduling policies
268 * -------------------------------------------------------------------------- */
271 addToRunQueue( Capability *cap, StgTSO *t )
273 #if defined(PARALLEL_HASKELL)
274 if (RtsFlags.ParFlags.doFairScheduling) {
275 // this does round-robin scheduling; good for concurrency
276 appendToRunQueue(cap,t);
278 // this does unfair scheduling; good for parallelism
279 pushOnRunQueue(cap,t);
282 // this does round-robin scheduling; good for concurrency
283 appendToRunQueue(cap,t);
287 /* ---------------------------------------------------------------------------
288 Main scheduling loop.
290 We use round-robin scheduling, each thread returning to the
291 scheduler loop when one of these conditions is detected:
294 * timer expires (thread yields)
300 In a GranSim setup this loop iterates over the global event queue.
301 This revolves around the global event queue, which determines what
302 to do next. Therefore, it's more complicated than either the
303 concurrent or the parallel (GUM) setup.
306 GUM iterates over incoming messages.
307 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
308 and sends out a fish whenever it has nothing to do; in-between
309 doing the actual reductions (shared code below) it processes the
310 incoming messages and deals with delayed operations
311 (see PendingFetches).
312 This is not the ugliest code you could imagine, but it's bloody close.
314 ------------------------------------------------------------------------ */
317 schedule (Capability *initialCapability, Task *task)
321 StgThreadReturnCode ret;
324 #elif defined(PARALLEL_HASKELL)
327 rtsBool receivedFinish = rtsFalse;
329 nat tp_size, sp_size; // stats only
334 #if defined(THREADED_RTS)
335 rtsBool first = rtsTrue;
338 cap = initialCapability;
340 // Pre-condition: this task owns initialCapability.
341 // The sched_mutex is *NOT* held
342 // NB. on return, we still hold a capability.
345 sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
346 task, initialCapability);
351 // -----------------------------------------------------------
352 // Scheduler loop starts here:
354 #if defined(PARALLEL_HASKELL)
355 #define TERMINATION_CONDITION (!receivedFinish)
357 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
359 #define TERMINATION_CONDITION rtsTrue
362 while (TERMINATION_CONDITION) {
365 /* Choose the processor with the next event */
366 CurrentProc = event->proc;
367 CurrentTSO = event->tso;
370 #if defined(THREADED_RTS)
372 // don't yield the first time, we want a chance to run this
373 // thread for a bit, even if there are others banging at the
376 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
378 // Yield the capability to higher-priority tasks if necessary.
379 yieldCapability(&cap, task);
384 schedulePushWork(cap,task);
387 // Check whether we have re-entered the RTS from Haskell without
388 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
390 if (cap->in_haskell) {
391 errorBelch("schedule: re-entered unsafely.\n"
392 " Perhaps a 'foreign import unsafe' should be 'safe'?");
393 stg_exit(EXIT_FAILURE);
397 // Test for interruption. If interrupted==rtsTrue, then either
398 // we received a keyboard interrupt (^C), or the scheduler is
399 // trying to shut down all the tasks (shutting_down_scheduler) in
405 discardSparksCap(cap);
407 if (shutting_down_scheduler) {
408 IF_DEBUG(scheduler, sched_belch("shutting down"));
409 // If we are a worker, just exit. If we're a bound thread
410 // then we will exit below when we've removed our TSO from
412 if (task->tso == NULL && emptyRunQueue(cap)) {
416 IF_DEBUG(scheduler, sched_belch("interrupted"));
421 // If the run queue is empty, take a spark and turn it into a thread.
423 if (emptyRunQueue(cap)) {
425 spark = findSpark(cap);
428 sched_belch("turning spark of closure %p into a thread",
429 (StgClosure *)spark));
430 createSparkThread(cap,spark);
436 scheduleStartSignalHandlers(cap);
438 // Only check the black holes here if we've nothing else to do.
439 // During normal execution, the black hole list only gets checked
440 // at GC time, to avoid repeatedly traversing this possibly long
441 // list each time around the scheduler.
442 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
444 scheduleCheckBlockedThreads(cap);
446 scheduleDetectDeadlock(cap,task);
448 // Normally, the only way we can get here with no threads to
449 // run is if a keyboard interrupt received during
450 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
451 // Additionally, it is not fatal for the
452 // threaded RTS to reach here with no threads to run.
454 // win32: might be here due to awaitEvent() being abandoned
455 // as a result of a console event having been delivered.
456 if ( emptyRunQueue(cap) ) {
457 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
460 continue; // nothing to do
463 #if defined(PARALLEL_HASKELL)
464 scheduleSendPendingMessages();
465 if (emptyRunQueue(cap) && scheduleActivateSpark())
469 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
472 /* If we still have no work we need to send a FISH to get a spark
474 if (emptyRunQueue(cap)) {
475 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
476 ASSERT(rtsFalse); // should not happen at the moment
478 // from here: non-empty run queue.
479 // TODO: merge above case with this, only one call processMessages() !
480 if (PacketsWaiting()) { /* process incoming messages, if
481 any pending... only in else
482 because getRemoteWork waits for
484 receivedFinish = processMessages();
489 scheduleProcessEvent(event);
493 // Get a thread to run
495 t = popRunQueue(cap);
497 #if defined(GRAN) || defined(PAR)
498 scheduleGranParReport(); // some kind of debuging output
500 // Sanity check the thread we're about to run. This can be
501 // expensive if there is lots of thread switching going on...
502 IF_DEBUG(sanity,checkTSO(t));
505 #if defined(THREADED_RTS)
506 // Check whether we can run this thread in the current task.
507 // If not, we have to pass our capability to the right task.
509 Task *bound = t->bound;
514 sched_belch("### Running thread %d in bound thread",
516 // yes, the Haskell thread is bound to the current native thread
519 sched_belch("### thread %d bound to another OS thread",
521 // no, bound to a different Haskell thread: pass to that thread
522 pushOnRunQueue(cap,t);
526 // The thread we want to run is unbound.
529 sched_belch("### this OS thread cannot run thread %d", t->id));
530 // no, the current native thread is bound to a different
531 // Haskell thread, so pass it to any worker thread
532 pushOnRunQueue(cap,t);
539 cap->r.rCurrentTSO = t;
541 /* context switches are initiated by the timer signal, unless
542 * the user specified "context switch as often as possible", with
545 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
546 && !emptyThreadQueues(cap)) {
552 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
553 (long)t->id, whatNext_strs[t->what_next]));
555 #if defined(PROFILING)
556 startHeapProfTimer();
559 // ----------------------------------------------------------------------
560 // Run the current thread
562 prev_what_next = t->what_next;
564 errno = t->saved_errno;
565 cap->in_haskell = rtsTrue;
567 recent_activity = ACTIVITY_YES;
569 switch (prev_what_next) {
573 /* Thread already finished, return to scheduler. */
574 ret = ThreadFinished;
580 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
581 cap = regTableToCapability(r);
586 case ThreadInterpret:
587 cap = interpretBCO(cap);
592 barf("schedule: invalid what_next field");
595 cap->in_haskell = rtsFalse;
597 // The TSO might have moved, eg. if it re-entered the RTS and a GC
598 // happened. So find the new location:
599 t = cap->r.rCurrentTSO;
601 // We have run some Haskell code: there might be blackhole-blocked
602 // threads to wake up now.
603 // Lock-free test here should be ok, we're just setting a flag.
604 if ( blackhole_queue != END_TSO_QUEUE ) {
605 blackholes_need_checking = rtsTrue;
608 // And save the current errno in this thread.
609 // XXX: possibly bogus for SMP because this thread might already
610 // be running again, see code below.
611 t->saved_errno = errno;
614 // If ret is ThreadBlocked, and this Task is bound to the TSO that
615 // blocked, we are in limbo - the TSO is now owned by whatever it
616 // is blocked on, and may in fact already have been woken up,
617 // perhaps even on a different Capability. It may be the case
618 // that task->cap != cap. We better yield this Capability
619 // immediately and return to normaility.
620 if (ret == ThreadBlocked) {
622 sched_belch("--<< thread %d (%s) stopped: blocked\n",
623 t->id, whatNext_strs[t->what_next]));
628 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
630 // ----------------------------------------------------------------------
632 // Costs for the scheduler are assigned to CCS_SYSTEM
633 #if defined(PROFILING)
638 #if defined(THREADED_RTS)
639 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
640 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
641 IF_DEBUG(scheduler,debugBelch("sched: "););
644 schedulePostRunThread();
646 ready_to_gc = rtsFalse;
650 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
654 scheduleHandleStackOverflow(cap,task,t);
658 if (scheduleHandleYield(cap, t, prev_what_next)) {
659 // shortcut for switching between compiler/interpreter:
665 scheduleHandleThreadBlocked(t);
669 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
670 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
674 barf("schedule: invalid thread return code %d", (int)ret);
677 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
678 if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
679 } /* end of while() */
681 IF_PAR_DEBUG(verbose,
682 debugBelch("== Leaving schedule() after having received Finish\n"));
685 /* ----------------------------------------------------------------------------
686 * Setting up the scheduler loop
687 * ------------------------------------------------------------------------- */
690 schedulePreLoop(void)
693 /* set up first event to get things going */
694 /* ToDo: assign costs for system setup and init MainTSO ! */
695 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
697 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
700 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n",
702 G_TSO(CurrentTSO, 5));
704 if (RtsFlags.GranFlags.Light) {
705 /* Save current time; GranSim Light only */
706 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
711 /* -----------------------------------------------------------------------------
714 * Push work to other Capabilities if we have some.
715 * -------------------------------------------------------------------------- */
719 schedulePushWork(Capability *cap USED_IF_SMP,
720 Task *task USED_IF_SMP)
722 Capability *free_caps[n_capabilities], *cap0;
725 // Check whether we have more threads on our run queue, or sparks
726 // in our pool, that we could hand to another Capability.
727 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
728 && sparkPoolSizeCap(cap) < 2) {
732 // First grab as many free Capabilities as we can.
733 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
734 cap0 = &capabilities[i];
735 if (cap != cap0 && tryGrabCapability(cap0,task)) {
736 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
737 // it already has some work, we just grabbed it at
738 // the wrong moment. Or maybe it's deadlocked!
739 releaseCapability(cap0);
741 free_caps[n_free_caps++] = cap0;
746 // we now have n_free_caps free capabilities stashed in
747 // free_caps[]. Share our run queue equally with them. This is
748 // probably the simplest thing we could do; improvements we might
749 // want to do include:
751 // - giving high priority to moving relatively new threads, on
752 // the gournds that they haven't had time to build up a
753 // working set in the cache on this CPU/Capability.
755 // - giving low priority to moving long-lived threads
757 if (n_free_caps > 0) {
758 StgTSO *prev, *t, *next;
759 rtsBool pushed_to_all;
761 IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
764 pushed_to_all = rtsFalse;
766 if (cap->run_queue_hd != END_TSO_QUEUE) {
767 prev = cap->run_queue_hd;
769 prev->link = END_TSO_QUEUE;
770 for (; t != END_TSO_QUEUE; t = next) {
772 t->link = END_TSO_QUEUE;
773 if (t->what_next == ThreadRelocated
774 || t->bound == task) { // don't move my bound thread
777 } else if (i == n_free_caps) {
778 pushed_to_all = rtsTrue;
784 IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
785 appendToRunQueue(free_caps[i],t);
786 if (t->bound) { t->bound->cap = free_caps[i]; }
790 cap->run_queue_tl = prev;
793 // If there are some free capabilities that we didn't push any
794 // threads to, then try to push a spark to each one.
795 if (!pushed_to_all) {
797 // i is the next free capability to push to
798 for (; i < n_free_caps; i++) {
799 if (emptySparkPoolCap(free_caps[i])) {
800 spark = findSpark(cap);
802 IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
803 newSpark(&(free_caps[i]->r), spark);
809 // release the capabilities
810 for (i = 0; i < n_free_caps; i++) {
811 task->cap = free_caps[i];
812 releaseCapability(free_caps[i]);
815 task->cap = cap; // reset to point to our Capability.
819 /* ----------------------------------------------------------------------------
820 * Start any pending signal handlers
821 * ------------------------------------------------------------------------- */
823 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
825 scheduleStartSignalHandlers(Capability *cap)
827 if (signals_pending()) { // safe outside the lock
828 startSignalHandlers(cap);
833 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
838 /* ----------------------------------------------------------------------------
839 * Check for blocked threads that can be woken up.
840 * ------------------------------------------------------------------------- */
843 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
845 #if !defined(THREADED_RTS)
847 // Check whether any waiting threads need to be woken up. If the
848 // run queue is empty, and there are no other tasks running, we
849 // can wait indefinitely for something to happen.
851 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
853 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
859 /* ----------------------------------------------------------------------------
860 * Check for threads blocked on BLACKHOLEs that can be woken up
861 * ------------------------------------------------------------------------- */
863 scheduleCheckBlackHoles (Capability *cap)
865 if ( blackholes_need_checking ) // check without the lock first
867 ACQUIRE_LOCK(&sched_mutex);
868 if ( blackholes_need_checking ) {
869 checkBlackHoles(cap);
870 blackholes_need_checking = rtsFalse;
872 RELEASE_LOCK(&sched_mutex);
876 /* ----------------------------------------------------------------------------
877 * Detect deadlock conditions and attempt to resolve them.
878 * ------------------------------------------------------------------------- */
881 scheduleDetectDeadlock (Capability *cap, Task *task)
884 #if defined(PARALLEL_HASKELL)
885 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
890 * Detect deadlock: when we have no threads to run, there are no
891 * threads blocked, waiting for I/O, or sleeping, and all the
892 * other tasks are waiting for work, we must have a deadlock of
895 if ( emptyThreadQueues(cap) )
897 #if defined(THREADED_RTS)
899 * In the threaded RTS, we only check for deadlock if there
900 * has been no activity in a complete timeslice. This means
901 * we won't eagerly start a full GC just because we don't have
902 * any threads to run currently.
904 if (recent_activity != ACTIVITY_INACTIVE) return;
907 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
909 // Garbage collection can release some new threads due to
910 // either (a) finalizers or (b) threads resurrected because
911 // they are unreachable and will therefore be sent an
912 // exception. Any threads thus released will be immediately
914 scheduleDoGC( cap, task, rtsTrue/*force major GC*/ );
915 recent_activity = ACTIVITY_DONE_GC;
917 if ( !emptyRunQueue(cap) ) return;
919 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
920 /* If we have user-installed signal handlers, then wait
921 * for signals to arrive rather then bombing out with a
924 if ( anyUserHandlers() ) {
926 sched_belch("still deadlocked, waiting for signals..."));
930 if (signals_pending()) {
931 startSignalHandlers(cap);
934 // either we have threads to run, or we were interrupted:
935 ASSERT(!emptyRunQueue(cap) || interrupted);
939 #if !defined(THREADED_RTS)
940 /* Probably a real deadlock. Send the current main thread the
941 * Deadlock exception.
944 switch (task->tso->why_blocked) {
946 case BlockedOnBlackHole:
947 case BlockedOnException:
949 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
952 barf("deadlock: main thread blocked in a strange way");
960 /* ----------------------------------------------------------------------------
961 * Process an event (GRAN only)
962 * ------------------------------------------------------------------------- */
966 scheduleProcessEvent(rtsEvent *event)
970 if (RtsFlags.GranFlags.Light)
971 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
973 /* adjust time based on time-stamp */
974 if (event->time > CurrentTime[CurrentProc] &&
975 event->evttype != ContinueThread)
976 CurrentTime[CurrentProc] = event->time;
978 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
979 if (!RtsFlags.GranFlags.Light)
982 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
984 /* main event dispatcher in GranSim */
985 switch (event->evttype) {
986 /* Should just be continuing execution */
988 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
989 /* ToDo: check assertion
990 ASSERT(run_queue_hd != (StgTSO*)NULL &&
991 run_queue_hd != END_TSO_QUEUE);
993 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
994 if (!RtsFlags.GranFlags.DoAsyncFetch &&
995 procStatus[CurrentProc]==Fetching) {
996 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
997 CurrentTSO->id, CurrentTSO, CurrentProc);
1000 /* Ignore ContinueThreads for completed threads */
1001 if (CurrentTSO->what_next == ThreadComplete) {
1002 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1003 CurrentTSO->id, CurrentTSO, CurrentProc);
1006 /* Ignore ContinueThreads for threads that are being migrated */
1007 if (PROCS(CurrentTSO)==Nowhere) {
1008 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1009 CurrentTSO->id, CurrentTSO, CurrentProc);
1012 /* The thread should be at the beginning of the run queue */
1013 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1014 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1015 CurrentTSO->id, CurrentTSO, CurrentProc);
1016 break; // run the thread anyway
1019 new_event(proc, proc, CurrentTime[proc],
1021 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1023 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1024 break; // now actually run the thread; DaH Qu'vam yImuHbej
1027 do_the_fetchnode(event);
1028 goto next_thread; /* handle next event in event queue */
1031 do_the_globalblock(event);
1032 goto next_thread; /* handle next event in event queue */
1035 do_the_fetchreply(event);
1036 goto next_thread; /* handle next event in event queue */
1038 case UnblockThread: /* Move from the blocked queue to the tail of */
1039 do_the_unblock(event);
1040 goto next_thread; /* handle next event in event queue */
1042 case ResumeThread: /* Move from the blocked queue to the tail of */
1043 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1044 event->tso->gran.blocktime +=
1045 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1046 do_the_startthread(event);
1047 goto next_thread; /* handle next event in event queue */
1050 do_the_startthread(event);
1051 goto next_thread; /* handle next event in event queue */
1054 do_the_movethread(event);
1055 goto next_thread; /* handle next event in event queue */
1058 do_the_movespark(event);
1059 goto next_thread; /* handle next event in event queue */
1062 do_the_findwork(event);
1063 goto next_thread; /* handle next event in event queue */
1066 barf("Illegal event type %u\n", event->evttype);
1069 /* This point was scheduler_loop in the old RTS */
1071 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1073 TimeOfLastEvent = CurrentTime[CurrentProc];
1074 TimeOfNextEvent = get_time_of_next_event();
1075 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1076 // CurrentTSO = ThreadQueueHd;
1078 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1081 if (RtsFlags.GranFlags.Light)
1082 GranSimLight_leave_system(event, &ActiveTSO);
1084 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1087 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1089 /* in a GranSim setup the TSO stays on the run queue */
1091 /* Take a thread from the run queue. */
1092 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1095 debugBelch("GRAN: About to run current thread, which is\n");
1098 context_switch = 0; // turned on via GranYield, checking events and time slice
1101 DumpGranEvent(GR_SCHEDULE, t));
1103 procStatus[CurrentProc] = Busy;
1107 /* ----------------------------------------------------------------------------
1108 * Send pending messages (PARALLEL_HASKELL only)
1109 * ------------------------------------------------------------------------- */
1111 #if defined(PARALLEL_HASKELL)
1113 scheduleSendPendingMessages(void)
1119 # if defined(PAR) // global Mem.Mgmt., omit for now
1120 if (PendingFetches != END_BF_QUEUE) {
1125 if (RtsFlags.ParFlags.BufferTime) {
1126 // if we use message buffering, we must send away all message
1127 // packets which have become too old...
1133 /* ----------------------------------------------------------------------------
1134 * Activate spark threads (PARALLEL_HASKELL only)
1135 * ------------------------------------------------------------------------- */
1137 #if defined(PARALLEL_HASKELL)
1139 scheduleActivateSpark(void)
1142 ASSERT(emptyRunQueue());
1143 /* We get here if the run queue is empty and want some work.
1144 We try to turn a spark into a thread, and add it to the run queue,
1145 from where it will be picked up in the next iteration of the scheduler
1149 /* :-[ no local threads => look out for local sparks */
1150 /* the spark pool for the current PE */
1151 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1152 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1153 pool->hd < pool->tl) {
1155 * ToDo: add GC code check that we really have enough heap afterwards!!
1157 * If we're here (no runnable threads) and we have pending
1158 * sparks, we must have a space problem. Get enough space
1159 * to turn one of those pending sparks into a
1163 spark = findSpark(rtsFalse); /* get a spark */
1164 if (spark != (rtsSpark) NULL) {
1165 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1166 IF_PAR_DEBUG(fish, // schedule,
1167 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1168 tso->id, tso, advisory_thread_count));
1170 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1171 IF_PAR_DEBUG(fish, // schedule,
1172 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1174 return rtsFalse; /* failed to generate a thread */
1175 } /* otherwise fall through & pick-up new tso */
1177 IF_PAR_DEBUG(fish, // schedule,
1178 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1179 spark_queue_len(pool)));
1180 return rtsFalse; /* failed to generate a thread */
1182 return rtsTrue; /* success in generating a thread */
1183 } else { /* no more threads permitted or pool empty */
1184 return rtsFalse; /* failed to generateThread */
1187 tso = NULL; // avoid compiler warning only
1188 return rtsFalse; /* dummy in non-PAR setup */
1191 #endif // PARALLEL_HASKELL
1193 /* ----------------------------------------------------------------------------
1194 * Get work from a remote node (PARALLEL_HASKELL only)
1195 * ------------------------------------------------------------------------- */
1197 #if defined(PARALLEL_HASKELL)
1199 scheduleGetRemoteWork(rtsBool *receivedFinish)
1201 ASSERT(emptyRunQueue());
1203 if (RtsFlags.ParFlags.BufferTime) {
1204 IF_PAR_DEBUG(verbose,
1205 debugBelch("...send all pending data,"));
1208 for (i=1; i<=nPEs; i++)
1209 sendImmediately(i); // send all messages away immediately
1213 //++EDEN++ idle() , i.e. send all buffers, wait for work
1214 // suppress fishing in EDEN... just look for incoming messages
1215 // (blocking receive)
1216 IF_PAR_DEBUG(verbose,
1217 debugBelch("...wait for incoming messages...\n"));
1218 *receivedFinish = processMessages(); // blocking receive...
1220 // and reenter scheduling loop after having received something
1221 // (return rtsFalse below)
1223 # else /* activate SPARKS machinery */
1224 /* We get here, if we have no work, tried to activate a local spark, but still
1225 have no work. We try to get a remote spark, by sending a FISH message.
1226 Thread migration should be added here, and triggered when a sequence of
1227 fishes returns without work. */
1228 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1230 /* =8-[ no local sparks => look for work on other PEs */
1232 * We really have absolutely no work. Send out a fish
1233 * (there may be some out there already), and wait for
1234 * something to arrive. We clearly can't run any threads
1235 * until a SCHEDULE or RESUME arrives, and so that's what
1236 * we're hoping to see. (Of course, we still have to
1237 * respond to other types of messages.)
1239 rtsTime now = msTime() /*CURRENT_TIME*/;
1240 IF_PAR_DEBUG(verbose,
1241 debugBelch("-- now=%ld\n", now));
1242 IF_PAR_DEBUG(fish, // verbose,
1243 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1244 (last_fish_arrived_at!=0 &&
1245 last_fish_arrived_at+delay > now)) {
1246 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1247 now, last_fish_arrived_at+delay,
1248 last_fish_arrived_at,
1252 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1253 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1254 if (last_fish_arrived_at==0 ||
1255 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1256 /* outstandingFishes is set in sendFish, processFish;
1257 avoid flooding system with fishes via delay */
1258 next_fish_to_send_at = 0;
1260 /* ToDo: this should be done in the main scheduling loop to avoid the
1261 busy wait here; not so bad if fish delay is very small */
1262 int iq = 0; // DEBUGGING -- HWL
1263 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1264 /* send a fish when ready, but process messages that arrive in the meantime */
1266 if (PacketsWaiting()) {
1268 *receivedFinish = processMessages();
1271 } while (!*receivedFinish || now<next_fish_to_send_at);
1272 // JB: This means the fish could become obsolete, if we receive
1273 // work. Better check for work again?
1274 // last line: while (!receivedFinish || !haveWork || now<...)
1275 // next line: if (receivedFinish || haveWork )
1277 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1278 return rtsFalse; // NB: this will leave scheduler loop
1279 // immediately after return!
1281 IF_PAR_DEBUG(fish, // verbose,
1282 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1286 // JB: IMHO, this should all be hidden inside sendFish(...)
1288 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1291 // Global statistics: count no. of fishes
1292 if (RtsFlags.ParFlags.ParStats.Global &&
1293 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1294 globalParStats.tot_fish_mess++;
1298 /* delayed fishes must have been sent by now! */
1299 next_fish_to_send_at = 0;
1302 *receivedFinish = processMessages();
1303 # endif /* SPARKS */
1306 /* NB: this function always returns rtsFalse, meaning the scheduler
1307 loop continues with the next iteration;
1309 return code means success in finding work; we enter this function
1310 if there is no local work, thus have to send a fish which takes
1311 time until it arrives with work; in the meantime we should process
1312 messages in the main loop;
1315 #endif // PARALLEL_HASKELL
1317 /* ----------------------------------------------------------------------------
1318 * PAR/GRAN: Report stats & debugging info(?)
1319 * ------------------------------------------------------------------------- */
1321 #if defined(PAR) || defined(GRAN)
1323 scheduleGranParReport(void)
1325 ASSERT(run_queue_hd != END_TSO_QUEUE);
1327 /* Take a thread from the run queue, if we have work */
1328 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1330 /* If this TSO has got its outport closed in the meantime,
1331 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1332 * It has to be marked as TH_DEAD for this purpose.
1333 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1335 JB: TODO: investigate wether state change field could be nuked
1336 entirely and replaced by the normal tso state (whatnext
1337 field). All we want to do is to kill tsos from outside.
1340 /* ToDo: write something to the log-file
1341 if (RTSflags.ParFlags.granSimStats && !sameThread)
1342 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1346 /* the spark pool for the current PE */
1347 pool = &(cap.r.rSparks); // cap = (old) MainCap
1350 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1351 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1354 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1355 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1357 if (RtsFlags.ParFlags.ParStats.Full &&
1358 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1359 (emitSchedule || // forced emit
1360 (t && LastTSO && t->id != LastTSO->id))) {
1362 we are running a different TSO, so write a schedule event to log file
1363 NB: If we use fair scheduling we also have to write a deschedule
1364 event for LastTSO; with unfair scheduling we know that the
1365 previous tso has blocked whenever we switch to another tso, so
1366 we don't need it in GUM for now
1368 IF_PAR_DEBUG(fish, // schedule,
1369 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1371 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1372 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1373 emitSchedule = rtsFalse;
1378 /* ----------------------------------------------------------------------------
1379 * After running a thread...
1380 * ------------------------------------------------------------------------- */
1383 schedulePostRunThread(void)
1386 /* HACK 675: if the last thread didn't yield, make sure to print a
1387 SCHEDULE event to the log file when StgRunning the next thread, even
1388 if it is the same one as before */
1390 TimeOfLastYield = CURRENT_TIME;
1393 /* some statistics gathering in the parallel case */
1395 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1399 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1400 globalGranStats.tot_heapover++;
1402 globalParStats.tot_heapover++;
1409 DumpGranEvent(GR_DESCHEDULE, t));
1410 globalGranStats.tot_stackover++;
1413 // DumpGranEvent(GR_DESCHEDULE, t);
1414 globalParStats.tot_stackover++;
1418 case ThreadYielding:
1421 DumpGranEvent(GR_DESCHEDULE, t));
1422 globalGranStats.tot_yields++;
1425 // DumpGranEvent(GR_DESCHEDULE, t);
1426 globalParStats.tot_yields++;
1433 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1434 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1435 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1436 if (t->block_info.closure!=(StgClosure*)NULL)
1437 print_bq(t->block_info.closure);
1440 // ??? needed; should emit block before
1442 DumpGranEvent(GR_DESCHEDULE, t));
1443 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1446 ASSERT(procStatus[CurrentProc]==Busy ||
1447 ((procStatus[CurrentProc]==Fetching) &&
1448 (t->block_info.closure!=(StgClosure*)NULL)));
1449 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1450 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1451 procStatus[CurrentProc]==Fetching))
1452 procStatus[CurrentProc] = Idle;
1455 //++PAR++ blockThread() writes the event (change?)
1459 case ThreadFinished:
1463 barf("parGlobalStats: unknown return code");
1469 /* -----------------------------------------------------------------------------
1470 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1471 * -------------------------------------------------------------------------- */
1474 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1476 // did the task ask for a large block?
1477 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1478 // if so, get one and push it on the front of the nursery.
1482 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1485 debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1486 (long)t->id, whatNext_strs[t->what_next], blocks));
1488 // don't do this if the nursery is (nearly) full, we'll GC first.
1489 if (cap->r.rCurrentNursery->link != NULL ||
1490 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1491 // if the nursery has only one block.
1494 bd = allocGroup( blocks );
1496 cap->r.rNursery->n_blocks += blocks;
1498 // link the new group into the list
1499 bd->link = cap->r.rCurrentNursery;
1500 bd->u.back = cap->r.rCurrentNursery->u.back;
1501 if (cap->r.rCurrentNursery->u.back != NULL) {
1502 cap->r.rCurrentNursery->u.back->link = bd;
1505 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1506 g0s0 == cap->r.rNursery);
1508 cap->r.rNursery->blocks = bd;
1510 cap->r.rCurrentNursery->u.back = bd;
1512 // initialise it as a nursery block. We initialise the
1513 // step, gen_no, and flags field of *every* sub-block in
1514 // this large block, because this is easier than making
1515 // sure that we always find the block head of a large
1516 // block whenever we call Bdescr() (eg. evacuate() and
1517 // isAlive() in the GC would both have to do this, at
1521 for (x = bd; x < bd + blocks; x++) {
1522 x->step = cap->r.rNursery;
1528 // This assert can be a killer if the app is doing lots
1529 // of large block allocations.
1530 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1532 // now update the nursery to point to the new block
1533 cap->r.rCurrentNursery = bd;
1535 // we might be unlucky and have another thread get on the
1536 // run queue before us and steal the large block, but in that
1537 // case the thread will just end up requesting another large
1539 pushOnRunQueue(cap,t);
1540 return rtsFalse; /* not actually GC'ing */
1545 debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1546 (long)t->id, whatNext_strs[t->what_next]));
1548 ASSERT(!is_on_queue(t,CurrentProc));
1549 #elif defined(PARALLEL_HASKELL)
1550 /* Currently we emit a DESCHEDULE event before GC in GUM.
1551 ToDo: either add separate event to distinguish SYSTEM time from rest
1552 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1553 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1554 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1555 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1556 emitSchedule = rtsTrue;
1560 pushOnRunQueue(cap,t);
1562 /* actual GC is done at the end of the while loop in schedule() */
1565 /* -----------------------------------------------------------------------------
1566 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1567 * -------------------------------------------------------------------------- */
1570 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1572 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1573 (long)t->id, whatNext_strs[t->what_next]));
1574 /* just adjust the stack for this thread, then pop it back
1578 /* enlarge the stack */
1579 StgTSO *new_t = threadStackOverflow(cap, t);
1581 /* The TSO attached to this Task may have moved, so update the
1584 if (task->tso == t) {
1587 pushOnRunQueue(cap,new_t);
1591 /* -----------------------------------------------------------------------------
1592 * Handle a thread that returned to the scheduler with ThreadYielding
1593 * -------------------------------------------------------------------------- */
1596 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1598 // Reset the context switch flag. We don't do this just before
1599 // running the thread, because that would mean we would lose ticks
1600 // during GC, which can lead to unfair scheduling (a thread hogs
1601 // the CPU because the tick always arrives during GC). This way
1602 // penalises threads that do a lot of allocation, but that seems
1603 // better than the alternative.
1606 /* put the thread back on the run queue. Then, if we're ready to
1607 * GC, check whether this is the last task to stop. If so, wake
1608 * up the GC thread. getThread will block during a GC until the
1612 if (t->what_next != prev_what_next) {
1613 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1614 (long)t->id, whatNext_strs[t->what_next]);
1616 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1617 (long)t->id, whatNext_strs[t->what_next]);
1622 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1624 ASSERT(t->link == END_TSO_QUEUE);
1626 // Shortcut if we're just switching evaluators: don't bother
1627 // doing stack squeezing (which can be expensive), just run the
1629 if (t->what_next != prev_what_next) {
1634 ASSERT(!is_on_queue(t,CurrentProc));
1637 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1638 checkThreadQsSanity(rtsTrue));
1642 addToRunQueue(cap,t);
1645 /* add a ContinueThread event to actually process the thread */
1646 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1648 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1650 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1657 /* -----------------------------------------------------------------------------
1658 * Handle a thread that returned to the scheduler with ThreadBlocked
1659 * -------------------------------------------------------------------------- */
1662 scheduleHandleThreadBlocked( StgTSO *t
1663 #if !defined(GRAN) && !defined(DEBUG)
1670 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1671 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1672 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1674 // ??? needed; should emit block before
1676 DumpGranEvent(GR_DESCHEDULE, t));
1677 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1680 ASSERT(procStatus[CurrentProc]==Busy ||
1681 ((procStatus[CurrentProc]==Fetching) &&
1682 (t->block_info.closure!=(StgClosure*)NULL)));
1683 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1684 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1685 procStatus[CurrentProc]==Fetching))
1686 procStatus[CurrentProc] = Idle;
1690 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1691 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1694 if (t->block_info.closure!=(StgClosure*)NULL)
1695 print_bq(t->block_info.closure));
1697 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1700 /* whatever we schedule next, we must log that schedule */
1701 emitSchedule = rtsTrue;
1705 // We don't need to do anything. The thread is blocked, and it
1706 // has tidied up its stack and placed itself on whatever queue
1707 // it needs to be on.
1710 ASSERT(t->why_blocked != NotBlocked);
1711 // This might not be true under SMP: we don't have
1712 // exclusive access to this TSO, so someone might have
1713 // woken it up by now. This actually happens: try
1714 // conc023 +RTS -N2.
1718 debugBelch("--<< thread %d (%s) stopped: ",
1719 t->id, whatNext_strs[t->what_next]);
1720 printThreadBlockage(t);
1723 /* Only for dumping event to log file
1724 ToDo: do I need this in GranSim, too?
1730 /* -----------------------------------------------------------------------------
1731 * Handle a thread that returned to the scheduler with ThreadFinished
1732 * -------------------------------------------------------------------------- */
1735 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1737 /* Need to check whether this was a main thread, and if so,
1738 * return with the return value.
1740 * We also end up here if the thread kills itself with an
1741 * uncaught exception, see Exception.cmm.
1743 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1744 t->id, whatNext_strs[t->what_next]));
1747 endThread(t, CurrentProc); // clean-up the thread
1748 #elif defined(PARALLEL_HASKELL)
1749 /* For now all are advisory -- HWL */
1750 //if(t->priority==AdvisoryPriority) ??
1751 advisory_thread_count--; // JB: Caution with this counter, buggy!
1754 if(t->dist.priority==RevalPriority)
1758 # if defined(EDENOLD)
1759 // the thread could still have an outport... (BUG)
1760 if (t->eden.outport != -1) {
1761 // delete the outport for the tso which has finished...
1762 IF_PAR_DEBUG(eden_ports,
1763 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1764 t->eden.outport, t->id));
1767 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1768 if (t->eden.epid != -1) {
1769 IF_PAR_DEBUG(eden_ports,
1770 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1771 t->id, t->eden.epid));
1772 removeTSOfromProcess(t);
1777 if (RtsFlags.ParFlags.ParStats.Full &&
1778 !RtsFlags.ParFlags.ParStats.Suppressed)
1779 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1781 // t->par only contains statistics: left out for now...
1783 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1784 t->id,t,t->par.sparkname));
1786 #endif // PARALLEL_HASKELL
1789 // Check whether the thread that just completed was a bound
1790 // thread, and if so return with the result.
1792 // There is an assumption here that all thread completion goes
1793 // through this point; we need to make sure that if a thread
1794 // ends up in the ThreadKilled state, that it stays on the run
1795 // queue so it can be dealt with here.
1800 if (t->bound != task) {
1801 #if !defined(THREADED_RTS)
1802 // Must be a bound thread that is not the topmost one. Leave
1803 // it on the run queue until the stack has unwound to the
1804 // point where we can deal with this. Leaving it on the run
1805 // queue also ensures that the garbage collector knows about
1806 // this thread and its return value (it gets dropped from the
1807 // all_threads list so there's no other way to find it).
1808 appendToRunQueue(cap,t);
1811 // this cannot happen in the threaded RTS, because a
1812 // bound thread can only be run by the appropriate Task.
1813 barf("finished bound thread that isn't mine");
1817 ASSERT(task->tso == t);
1819 if (t->what_next == ThreadComplete) {
1821 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1822 *(task->ret) = (StgClosure *)task->tso->sp[1];
1824 task->stat = Success;
1827 *(task->ret) = NULL;
1830 task->stat = Interrupted;
1832 task->stat = Killed;
1836 removeThreadLabel((StgWord)task->tso->id);
1838 return rtsTrue; // tells schedule() to return
1844 /* -----------------------------------------------------------------------------
1845 * Perform a heap census, if PROFILING
1846 * -------------------------------------------------------------------------- */
1849 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1851 #if defined(PROFILING)
1852 // When we have +RTS -i0 and we're heap profiling, do a census at
1853 // every GC. This lets us get repeatable runs for debugging.
1854 if (performHeapProfile ||
1855 (RtsFlags.ProfFlags.profileInterval==0 &&
1856 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1857 GarbageCollect(GetRoots, rtsTrue);
1859 performHeapProfile = rtsFalse;
1860 return rtsTrue; // true <=> we already GC'd
1866 /* -----------------------------------------------------------------------------
1867 * Perform a garbage collection if necessary
1868 * -------------------------------------------------------------------------- */
1871 scheduleDoGC( Capability *cap, Task *task USED_IF_SMP, rtsBool force_major )
1875 static volatile StgWord waiting_for_gc;
1876 rtsBool was_waiting;
1881 // In order to GC, there must be no threads running Haskell code.
1882 // Therefore, the GC thread needs to hold *all* the capabilities,
1883 // and release them after the GC has completed.
1885 // This seems to be the simplest way: previous attempts involved
1886 // making all the threads with capabilities give up their
1887 // capabilities and sleep except for the *last* one, which
1888 // actually did the GC. But it's quite hard to arrange for all
1889 // the other tasks to sleep and stay asleep.
1892 was_waiting = cas(&waiting_for_gc, 0, 1);
1895 IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1896 yieldCapability(&cap,task);
1897 } while (waiting_for_gc);
1901 for (i=0; i < n_capabilities; i++) {
1902 IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1903 if (cap != &capabilities[i]) {
1904 Capability *pcap = &capabilities[i];
1905 // we better hope this task doesn't get migrated to
1906 // another Capability while we're waiting for this one.
1907 // It won't, because load balancing happens while we have
1908 // all the Capabilities, but even so it's a slightly
1909 // unsavoury invariant.
1912 waitForReturnCapability(&pcap, task);
1913 if (pcap != &capabilities[i]) {
1914 barf("scheduleDoGC: got the wrong capability");
1919 waiting_for_gc = rtsFalse;
1922 /* Kick any transactions which are invalid back to their
1923 * atomically frames. When next scheduled they will try to
1924 * commit, this commit will fail and they will retry.
1929 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1930 if (t->what_next == ThreadRelocated) {
1933 next = t->global_link;
1934 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1935 if (!stmValidateNestOfTransactions (t -> trec)) {
1936 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1938 // strip the stack back to the
1939 // ATOMICALLY_FRAME, aborting the (nested)
1940 // transaction, and saving the stack of any
1941 // partially-evaluated thunks on the heap.
1942 raiseAsync_(cap, t, NULL, rtsTrue, NULL);
1945 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1953 // so this happens periodically:
1954 scheduleCheckBlackHoles(cap);
1956 IF_DEBUG(scheduler, printAllThreads());
1958 /* everybody back, start the GC.
1959 * Could do it in this thread, or signal a condition var
1960 * to do it in another thread. Either way, we need to
1961 * broadcast on gc_pending_cond afterward.
1963 #if defined(THREADED_RTS)
1964 IF_DEBUG(scheduler,sched_belch("doing GC"));
1966 GarbageCollect(GetRoots, force_major);
1969 // release our stash of capabilities.
1970 for (i = 0; i < n_capabilities; i++) {
1971 if (cap != &capabilities[i]) {
1972 task->cap = &capabilities[i];
1973 releaseCapability(&capabilities[i]);
1980 /* add a ContinueThread event to continue execution of current thread */
1981 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1983 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1985 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1991 /* ---------------------------------------------------------------------------
1992 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1993 * used by Control.Concurrent for error checking.
1994 * ------------------------------------------------------------------------- */
1997 rtsSupportsBoundThreads(void)
1999 #if defined(THREADED_RTS)
2006 /* ---------------------------------------------------------------------------
2007 * isThreadBound(tso): check whether tso is bound to an OS thread.
2008 * ------------------------------------------------------------------------- */
2011 isThreadBound(StgTSO* tso USED_IF_THREADS)
2013 #if defined(THREADED_RTS)
2014 return (tso->bound != NULL);
2019 /* ---------------------------------------------------------------------------
2020 * Singleton fork(). Do not copy any running threads.
2021 * ------------------------------------------------------------------------- */
2023 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2024 #define FORKPROCESS_PRIMOP_SUPPORTED
2027 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2029 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2032 forkProcess(HsStablePtr *entry
2033 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2038 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2044 IF_DEBUG(scheduler,sched_belch("forking!"));
2046 // ToDo: for SMP, we should probably acquire *all* the capabilities
2051 if (pid) { // parent
2053 // just return the pid
2059 // delete all threads
2060 cap->run_queue_hd = END_TSO_QUEUE;
2061 cap->run_queue_tl = END_TSO_QUEUE;
2063 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2066 // don't allow threads to catch the ThreadKilled exception
2067 deleteThreadImmediately(cap,t);
2070 // wipe the task list
2071 ACQUIRE_LOCK(&sched_mutex);
2072 for (task = all_tasks; task != NULL; task=task->all_link) {
2073 if (task != cap->running_task) discardTask(task);
2075 RELEASE_LOCK(&sched_mutex);
2077 cap->suspended_ccalling_tasks = NULL;
2079 #if defined(THREADED_RTS)
2080 // wipe our spare workers list.
2081 cap->spare_workers = NULL;
2082 cap->returning_tasks_hd = NULL;
2083 cap->returning_tasks_tl = NULL;
2086 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2087 rts_checkSchedStatus("forkProcess",cap);
2090 hs_exit(); // clean up and exit
2091 stg_exit(EXIT_SUCCESS);
2093 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2094 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2099 /* ---------------------------------------------------------------------------
2100 * Delete the threads on the run queue of the current capability.
2101 * ------------------------------------------------------------------------- */
2104 deleteRunQueue (Capability *cap)
2107 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2108 ASSERT(t->what_next != ThreadRelocated);
2110 deleteThread(cap, t);
2114 /* startThread and insertThread are now in GranSim.c -- HWL */
2117 /* -----------------------------------------------------------------------------
2118 Managing the suspended_ccalling_tasks list.
2119 Locks required: sched_mutex
2120 -------------------------------------------------------------------------- */
2123 suspendTask (Capability *cap, Task *task)
2125 ASSERT(task->next == NULL && task->prev == NULL);
2126 task->next = cap->suspended_ccalling_tasks;
2128 if (cap->suspended_ccalling_tasks) {
2129 cap->suspended_ccalling_tasks->prev = task;
2131 cap->suspended_ccalling_tasks = task;
2135 recoverSuspendedTask (Capability *cap, Task *task)
2138 task->prev->next = task->next;
2140 ASSERT(cap->suspended_ccalling_tasks == task);
2141 cap->suspended_ccalling_tasks = task->next;
2144 task->next->prev = task->prev;
2146 task->next = task->prev = NULL;
2149 /* ---------------------------------------------------------------------------
2150 * Suspending & resuming Haskell threads.
2152 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2153 * its capability before calling the C function. This allows another
2154 * task to pick up the capability and carry on running Haskell
2155 * threads. It also means that if the C call blocks, it won't lock
2158 * The Haskell thread making the C call is put to sleep for the
2159 * duration of the call, on the susepended_ccalling_threads queue. We
2160 * give out a token to the task, which it can use to resume the thread
2161 * on return from the C function.
2162 * ------------------------------------------------------------------------- */
2165 suspendThread (StgRegTable *reg)
2168 int saved_errno = errno;
2172 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2174 cap = regTableToCapability(reg);
2176 task = cap->running_task;
2177 tso = cap->r.rCurrentTSO;
2180 sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2182 // XXX this might not be necessary --SDM
2183 tso->what_next = ThreadRunGHC;
2185 threadPaused(cap,tso);
2187 if(tso->blocked_exceptions == NULL) {
2188 tso->why_blocked = BlockedOnCCall;
2189 tso->blocked_exceptions = END_TSO_QUEUE;
2191 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2194 // Hand back capability
2195 task->suspended_tso = tso;
2197 ACQUIRE_LOCK(&cap->lock);
2199 suspendTask(cap,task);
2200 cap->in_haskell = rtsFalse;
2201 releaseCapability_(cap);
2203 RELEASE_LOCK(&cap->lock);
2205 #if defined(THREADED_RTS)
2206 /* Preparing to leave the RTS, so ensure there's a native thread/task
2207 waiting to take over.
2209 IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2212 errno = saved_errno;
2217 resumeThread (void *task_)
2221 int saved_errno = errno;
2225 // Wait for permission to re-enter the RTS with the result.
2226 waitForReturnCapability(&cap,task);
2227 // we might be on a different capability now... but if so, our
2228 // entry on the suspended_ccalling_tasks list will also have been
2231 // Remove the thread from the suspended list
2232 recoverSuspendedTask(cap,task);
2234 tso = task->suspended_tso;
2235 task->suspended_tso = NULL;
2236 tso->link = END_TSO_QUEUE;
2237 IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2239 if (tso->why_blocked == BlockedOnCCall) {
2240 awakenBlockedQueue(cap,tso->blocked_exceptions);
2241 tso->blocked_exceptions = NULL;
2244 /* Reset blocking status */
2245 tso->why_blocked = NotBlocked;
2247 cap->r.rCurrentTSO = tso;
2248 cap->in_haskell = rtsTrue;
2249 errno = saved_errno;
2254 /* ---------------------------------------------------------------------------
2255 * Comparing Thread ids.
2257 * This is used from STG land in the implementation of the
2258 * instances of Eq/Ord for ThreadIds.
2259 * ------------------------------------------------------------------------ */
2262 cmp_thread(StgPtr tso1, StgPtr tso2)
2264 StgThreadID id1 = ((StgTSO *)tso1)->id;
2265 StgThreadID id2 = ((StgTSO *)tso2)->id;
2267 if (id1 < id2) return (-1);
2268 if (id1 > id2) return 1;
2272 /* ---------------------------------------------------------------------------
2273 * Fetching the ThreadID from an StgTSO.
2275 * This is used in the implementation of Show for ThreadIds.
2276 * ------------------------------------------------------------------------ */
2278 rts_getThreadId(StgPtr tso)
2280 return ((StgTSO *)tso)->id;
2285 labelThread(StgPtr tso, char *label)
2290 /* Caveat: Once set, you can only set the thread name to "" */
2291 len = strlen(label)+1;
2292 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2293 strncpy(buf,label,len);
2294 /* Update will free the old memory for us */
2295 updateThreadLabel(((StgTSO *)tso)->id,buf);
2299 /* ---------------------------------------------------------------------------
2300 Create a new thread.
2302 The new thread starts with the given stack size. Before the
2303 scheduler can run, however, this thread needs to have a closure
2304 (and possibly some arguments) pushed on its stack. See
2305 pushClosure() in Schedule.h.
2307 createGenThread() and createIOThread() (in SchedAPI.h) are
2308 convenient packaged versions of this function.
2310 currently pri (priority) is only used in a GRAN setup -- HWL
2311 ------------------------------------------------------------------------ */
2313 /* currently pri (priority) is only used in a GRAN setup -- HWL */
2315 createThread(nat size, StgInt pri)
2318 createThread(Capability *cap, nat size)
2324 /* sched_mutex is *not* required */
2326 /* First check whether we should create a thread at all */
2327 #if defined(PARALLEL_HASKELL)
2328 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2329 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2331 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2332 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2333 return END_TSO_QUEUE;
2339 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2342 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2344 /* catch ridiculously small stack sizes */
2345 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2346 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2349 stack_size = size - TSO_STRUCT_SIZEW;
2351 tso = (StgTSO *)allocateLocal(cap, size);
2352 TICK_ALLOC_TSO(stack_size, 0);
2354 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2356 SET_GRAN_HDR(tso, ThisPE);
2359 // Always start with the compiled code evaluator
2360 tso->what_next = ThreadRunGHC;
2362 tso->why_blocked = NotBlocked;
2363 tso->blocked_exceptions = NULL;
2365 tso->saved_errno = 0;
2368 tso->stack_size = stack_size;
2369 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
2371 tso->sp = (P_)&(tso->stack) + stack_size;
2373 tso->trec = NO_TREC;
2376 tso->prof.CCCS = CCS_MAIN;
2379 /* put a stop frame on the stack */
2380 tso->sp -= sizeofW(StgStopFrame);
2381 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2382 tso->link = END_TSO_QUEUE;
2386 /* uses more flexible routine in GranSim */
2387 insertThread(tso, CurrentProc);
2389 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2395 if (RtsFlags.GranFlags.GranSimStats.Full)
2396 DumpGranEvent(GR_START,tso);
2397 #elif defined(PARALLEL_HASKELL)
2398 if (RtsFlags.ParFlags.ParStats.Full)
2399 DumpGranEvent(GR_STARTQ,tso);
2400 /* HACk to avoid SCHEDULE
2404 /* Link the new thread on the global thread list.
2406 ACQUIRE_LOCK(&sched_mutex);
2407 tso->id = next_thread_id++; // while we have the mutex
2408 tso->global_link = all_threads;
2410 RELEASE_LOCK(&sched_mutex);
2413 tso->dist.priority = MandatoryPriority; //by default that is...
2417 tso->gran.pri = pri;
2419 tso->gran.magic = TSO_MAGIC; // debugging only
2421 tso->gran.sparkname = 0;
2422 tso->gran.startedat = CURRENT_TIME;
2423 tso->gran.exported = 0;
2424 tso->gran.basicblocks = 0;
2425 tso->gran.allocs = 0;
2426 tso->gran.exectime = 0;
2427 tso->gran.fetchtime = 0;
2428 tso->gran.fetchcount = 0;
2429 tso->gran.blocktime = 0;
2430 tso->gran.blockcount = 0;
2431 tso->gran.blockedat = 0;
2432 tso->gran.globalsparks = 0;
2433 tso->gran.localsparks = 0;
2434 if (RtsFlags.GranFlags.Light)
2435 tso->gran.clock = Now; /* local clock */
2437 tso->gran.clock = 0;
2439 IF_DEBUG(gran,printTSO(tso));
2440 #elif defined(PARALLEL_HASKELL)
2442 tso->par.magic = TSO_MAGIC; // debugging only
2444 tso->par.sparkname = 0;
2445 tso->par.startedat = CURRENT_TIME;
2446 tso->par.exported = 0;
2447 tso->par.basicblocks = 0;
2448 tso->par.allocs = 0;
2449 tso->par.exectime = 0;
2450 tso->par.fetchtime = 0;
2451 tso->par.fetchcount = 0;
2452 tso->par.blocktime = 0;
2453 tso->par.blockcount = 0;
2454 tso->par.blockedat = 0;
2455 tso->par.globalsparks = 0;
2456 tso->par.localsparks = 0;
2460 globalGranStats.tot_threads_created++;
2461 globalGranStats.threads_created_on_PE[CurrentProc]++;
2462 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2463 globalGranStats.tot_sq_probes++;
2464 #elif defined(PARALLEL_HASKELL)
2465 // collect parallel global statistics (currently done together with GC stats)
2466 if (RtsFlags.ParFlags.ParStats.Global &&
2467 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2468 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
2469 globalParStats.tot_threads_created++;
2475 sched_belch("==__ schedule: Created TSO %d (%p);",
2476 CurrentProc, tso, tso->id));
2477 #elif defined(PARALLEL_HASKELL)
2478 IF_PAR_DEBUG(verbose,
2479 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2480 (long)tso->id, tso, advisory_thread_count));
2482 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
2483 (long)tso->id, (long)tso->stack_size));
2490 all parallel thread creation calls should fall through the following routine.
2493 createThreadFromSpark(rtsSpark spark)
2495 ASSERT(spark != (rtsSpark)NULL);
2496 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2497 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
2499 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2500 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2501 return END_TSO_QUEUE;
2505 tso = createThread(RtsFlags.GcFlags.initialStkSize);
2506 if (tso==END_TSO_QUEUE)
2507 barf("createSparkThread: Cannot create TSO");
2509 tso->priority = AdvisoryPriority;
2511 pushClosure(tso,spark);
2513 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
2520 Turn a spark into a thread.
2521 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2525 activateSpark (rtsSpark spark)
2529 tso = createSparkThread(spark);
2530 if (RtsFlags.ParFlags.ParStats.Full) {
2531 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2532 IF_PAR_DEBUG(verbose,
2533 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2534 (StgClosure *)spark, info_type((StgClosure *)spark)));
2536 // ToDo: fwd info on local/global spark to thread -- HWL
2537 // tso->gran.exported = spark->exported;
2538 // tso->gran.locked = !spark->global;
2539 // tso->gran.sparkname = spark->name;
2545 /* ---------------------------------------------------------------------------
2548 * scheduleThread puts a thread on the end of the runnable queue.
2549 * This will usually be done immediately after a thread is created.
2550 * The caller of scheduleThread must create the thread using e.g.
2551 * createThread and push an appropriate closure
2552 * on this thread's stack before the scheduler is invoked.
2553 * ------------------------------------------------------------------------ */
2556 scheduleThread(Capability *cap, StgTSO *tso)
2558 // The thread goes at the *end* of the run-queue, to avoid possible
2559 // starvation of any threads already on the queue.
2560 appendToRunQueue(cap,tso);
2564 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2568 // We already created/initialised the Task
2569 task = cap->running_task;
2571 // This TSO is now a bound thread; make the Task and TSO
2572 // point to each other.
2577 task->stat = NoStatus;
2579 appendToRunQueue(cap,tso);
2581 IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2584 /* GranSim specific init */
2585 CurrentTSO = m->tso; // the TSO to run
2586 procStatus[MainProc] = Busy; // status of main PE
2587 CurrentProc = MainProc; // PE to run it on
2590 cap = schedule(cap,task);
2592 ASSERT(task->stat != NoStatus);
2593 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2595 IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2599 /* ----------------------------------------------------------------------------
2601 * ------------------------------------------------------------------------- */
2603 #if defined(THREADED_RTS)
2605 workerStart(Task *task)
2609 // See startWorkerTask().
2610 ACQUIRE_LOCK(&task->lock);
2612 RELEASE_LOCK(&task->lock);
2614 // set the thread-local pointer to the Task:
2617 // schedule() runs without a lock.
2618 cap = schedule(cap,task);
2620 // On exit from schedule(), we have a Capability.
2621 releaseCapability(cap);
2626 /* ---------------------------------------------------------------------------
2629 * Initialise the scheduler. This resets all the queues - if the
2630 * queues contained any threads, they'll be garbage collected at the
2633 * ------------------------------------------------------------------------ */
2640 for (i=0; i<=MAX_PROC; i++) {
2641 run_queue_hds[i] = END_TSO_QUEUE;
2642 run_queue_tls[i] = END_TSO_QUEUE;
2643 blocked_queue_hds[i] = END_TSO_QUEUE;
2644 blocked_queue_tls[i] = END_TSO_QUEUE;
2645 ccalling_threadss[i] = END_TSO_QUEUE;
2646 blackhole_queue[i] = END_TSO_QUEUE;
2647 sleeping_queue = END_TSO_QUEUE;
2649 #elif !defined(THREADED_RTS)
2650 blocked_queue_hd = END_TSO_QUEUE;
2651 blocked_queue_tl = END_TSO_QUEUE;
2652 sleeping_queue = END_TSO_QUEUE;
2655 blackhole_queue = END_TSO_QUEUE;
2656 all_threads = END_TSO_QUEUE;
2661 RtsFlags.ConcFlags.ctxtSwitchTicks =
2662 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2664 #if defined(THREADED_RTS)
2665 /* Initialise the mutex and condition variables used by
2667 initMutex(&sched_mutex);
2670 ACQUIRE_LOCK(&sched_mutex);
2672 /* A capability holds the state a native thread needs in
2673 * order to execute STG code. At least one capability is
2674 * floating around (only SMP builds have more than one).
2680 #if defined(SMP) || defined(PARALLEL_HASKELL)
2686 * Eagerly start one worker to run each Capability, except for
2687 * Capability 0. The idea is that we're probably going to start a
2688 * bound thread on Capability 0 pretty soon, so we don't want a
2689 * worker task hogging it.
2694 for (i = 1; i < n_capabilities; i++) {
2695 cap = &capabilities[i];
2696 ACQUIRE_LOCK(&cap->lock);
2697 startWorkerTask(cap, workerStart);
2698 RELEASE_LOCK(&cap->lock);
2703 RELEASE_LOCK(&sched_mutex);
2707 exitScheduler( void )
2709 interrupted = rtsTrue;
2710 shutting_down_scheduler = rtsTrue;
2712 #if defined(THREADED_RTS)
2717 ACQUIRE_LOCK(&sched_mutex);
2718 task = newBoundTask();
2719 RELEASE_LOCK(&sched_mutex);
2721 for (i = 0; i < n_capabilities; i++) {
2722 shutdownCapability(&capabilities[i], task);
2724 boundTaskExiting(task);
2730 /* ---------------------------------------------------------------------------
2731 Where are the roots that we know about?
2733 - all the threads on the runnable queue
2734 - all the threads on the blocked queue
2735 - all the threads on the sleeping queue
2736 - all the thread currently executing a _ccall_GC
2737 - all the "main threads"
2739 ------------------------------------------------------------------------ */
2741 /* This has to be protected either by the scheduler monitor, or by the
2742 garbage collection monitor (probably the latter).
2747 GetRoots( evac_fn evac )
2754 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2755 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2756 evac((StgClosure **)&run_queue_hds[i]);
2757 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2758 evac((StgClosure **)&run_queue_tls[i]);
2760 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2761 evac((StgClosure **)&blocked_queue_hds[i]);
2762 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2763 evac((StgClosure **)&blocked_queue_tls[i]);
2764 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2765 evac((StgClosure **)&ccalling_threads[i]);
2772 for (i = 0; i < n_capabilities; i++) {
2773 cap = &capabilities[i];
2774 evac((StgClosure **)&cap->run_queue_hd);
2775 evac((StgClosure **)&cap->run_queue_tl);
2777 for (task = cap->suspended_ccalling_tasks; task != NULL;
2779 evac((StgClosure **)&task->suspended_tso);
2783 #if !defined(THREADED_RTS)
2784 evac((StgClosure **)&blocked_queue_hd);
2785 evac((StgClosure **)&blocked_queue_tl);
2786 evac((StgClosure **)&sleeping_queue);
2790 evac((StgClosure **)&blackhole_queue);
2792 #if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN)
2793 markSparkQueue(evac);
2796 #if defined(RTS_USER_SIGNALS)
2797 // mark the signal handlers (signals should be already blocked)
2798 markSignalHandlers(evac);
2802 /* -----------------------------------------------------------------------------
2805 This is the interface to the garbage collector from Haskell land.
2806 We provide this so that external C code can allocate and garbage
2807 collect when called from Haskell via _ccall_GC.
2809 It might be useful to provide an interface whereby the programmer
2810 can specify more roots (ToDo).
2812 This needs to be protected by the GC condition variable above. KH.
2813 -------------------------------------------------------------------------- */
2815 static void (*extra_roots)(evac_fn);
2821 // ToDo: we have to grab all the capabilities here.
2822 errorBelch("performGC not supported in threaded RTS (yet)");
2823 stg_exit(EXIT_FAILURE);
2825 /* Obligated to hold this lock upon entry */
2826 GarbageCollect(GetRoots,rtsFalse);
2830 performMajorGC(void)
2833 errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2834 stg_exit(EXIT_FAILURE);
2836 GarbageCollect(GetRoots,rtsTrue);
2840 AllRoots(evac_fn evac)
2842 GetRoots(evac); // the scheduler's roots
2843 extra_roots(evac); // the user's roots
2847 performGCWithRoots(void (*get_roots)(evac_fn))
2850 errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2851 stg_exit(EXIT_FAILURE);
2853 extra_roots = get_roots;
2854 GarbageCollect(AllRoots,rtsFalse);
2857 /* -----------------------------------------------------------------------------
2860 If the thread has reached its maximum stack size, then raise the
2861 StackOverflow exception in the offending thread. Otherwise
2862 relocate the TSO into a larger chunk of memory and adjust its stack
2864 -------------------------------------------------------------------------- */
2867 threadStackOverflow(Capability *cap, StgTSO *tso)
2869 nat new_stack_size, stack_words;
2874 IF_DEBUG(sanity,checkTSO(tso));
2875 if (tso->stack_size >= tso->max_stack_size) {
2878 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2879 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2880 /* If we're debugging, just print out the top of the stack */
2881 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2884 /* Send this thread the StackOverflow exception */
2885 raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2889 /* Try to double the current stack size. If that takes us over the
2890 * maximum stack size for this thread, then use the maximum instead.
2891 * Finally round up so the TSO ends up as a whole number of blocks.
2893 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2894 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2895 TSO_STRUCT_SIZE)/sizeof(W_);
2896 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2897 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2899 IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2901 dest = (StgTSO *)allocate(new_tso_size);
2902 TICK_ALLOC_TSO(new_stack_size,0);
2904 /* copy the TSO block and the old stack into the new area */
2905 memcpy(dest,tso,TSO_STRUCT_SIZE);
2906 stack_words = tso->stack + tso->stack_size - tso->sp;
2907 new_sp = (P_)dest + new_tso_size - stack_words;
2908 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2910 /* relocate the stack pointers... */
2912 dest->stack_size = new_stack_size;
2914 /* Mark the old TSO as relocated. We have to check for relocated
2915 * TSOs in the garbage collector and any primops that deal with TSOs.
2917 * It's important to set the sp value to just beyond the end
2918 * of the stack, so we don't attempt to scavenge any part of the
2921 tso->what_next = ThreadRelocated;
2923 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2924 tso->why_blocked = NotBlocked;
2926 IF_PAR_DEBUG(verbose,
2927 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2928 tso->id, tso, tso->stack_size);
2929 /* If we're debugging, just print out the top of the stack */
2930 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2933 IF_DEBUG(sanity,checkTSO(tso));
2935 IF_DEBUG(scheduler,printTSO(dest));
2941 /* ---------------------------------------------------------------------------
2942 Wake up a queue that was blocked on some resource.
2943 ------------------------------------------------------------------------ */
2947 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2950 #elif defined(PARALLEL_HASKELL)
2952 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2954 /* write RESUME events to log file and
2955 update blocked and fetch time (depending on type of the orig closure) */
2956 if (RtsFlags.ParFlags.ParStats.Full) {
2957 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2958 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2959 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2960 if (emptyRunQueue())
2961 emitSchedule = rtsTrue;
2963 switch (get_itbl(node)->type) {
2965 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2970 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2977 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2984 StgBlockingQueueElement *
2985 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2988 PEs node_loc, tso_loc;
2990 node_loc = where_is(node); // should be lifted out of loop
2991 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2992 tso_loc = where_is((StgClosure *)tso);
2993 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2994 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2995 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2996 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2997 // insertThread(tso, node_loc);
2998 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3000 tso, node, (rtsSpark*)NULL);
3001 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3004 } else { // TSO is remote (actually should be FMBQ)
3005 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3006 RtsFlags.GranFlags.Costs.gunblocktime +
3007 RtsFlags.GranFlags.Costs.latency;
3008 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3010 tso, node, (rtsSpark*)NULL);
3011 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3014 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3016 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3017 (node_loc==tso_loc ? "Local" : "Global"),
3018 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3019 tso->block_info.closure = NULL;
3020 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
3023 #elif defined(PARALLEL_HASKELL)
3024 StgBlockingQueueElement *
3025 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3027 StgBlockingQueueElement *next;
3029 switch (get_itbl(bqe)->type) {
3031 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3032 /* if it's a TSO just push it onto the run_queue */
3034 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3035 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
3037 unblockCount(bqe, node);
3038 /* reset blocking status after dumping event */
3039 ((StgTSO *)bqe)->why_blocked = NotBlocked;
3043 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3045 bqe->link = (StgBlockingQueueElement *)PendingFetches;
3046 PendingFetches = (StgBlockedFetch *)bqe;
3050 /* can ignore this case in a non-debugging setup;
3051 see comments on RBHSave closures above */
3053 /* check that the closure is an RBHSave closure */
3054 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3055 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3056 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3060 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3061 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
3065 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3071 unblockOne(Capability *cap, StgTSO *tso)
3075 ASSERT(get_itbl(tso)->type == TSO);
3076 ASSERT(tso->why_blocked != NotBlocked);
3077 tso->why_blocked = NotBlocked;
3079 tso->link = END_TSO_QUEUE;
3081 // We might have just migrated this TSO to our Capability:
3083 tso->bound->cap = cap;
3086 appendToRunQueue(cap,tso);
3088 // we're holding a newly woken thread, make sure we context switch
3089 // quickly so we can migrate it if necessary.
3091 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3098 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3100 StgBlockingQueueElement *bqe;
3105 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3106 node, CurrentProc, CurrentTime[CurrentProc],
3107 CurrentTSO->id, CurrentTSO));
3109 node_loc = where_is(node);
3111 ASSERT(q == END_BQ_QUEUE ||
3112 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
3113 get_itbl(q)->type == CONSTR); // closure (type constructor)
3114 ASSERT(is_unique(node));
3116 /* FAKE FETCH: magically copy the node to the tso's proc;
3117 no Fetch necessary because in reality the node should not have been
3118 moved to the other PE in the first place
3120 if (CurrentProc!=node_loc) {
3122 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3123 node, node_loc, CurrentProc, CurrentTSO->id,
3124 // CurrentTSO, where_is(CurrentTSO),
3125 node->header.gran.procs));
3126 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3128 debugBelch("## new bitmask of node %p is %#x\n",
3129 node, node->header.gran.procs));
3130 if (RtsFlags.GranFlags.GranSimStats.Global) {
3131 globalGranStats.tot_fake_fetches++;
3136 // ToDo: check: ASSERT(CurrentProc==node_loc);
3137 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3140 bqe points to the current element in the queue
3141 next points to the next element in the queue
3143 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3144 //tso_loc = where_is(tso);
3146 bqe = unblockOne(bqe, node);
3149 /* if this is the BQ of an RBH, we have to put back the info ripped out of
3150 the closure to make room for the anchor of the BQ */
3151 if (bqe!=END_BQ_QUEUE) {
3152 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3154 ASSERT((info_ptr==&RBH_Save_0_info) ||
3155 (info_ptr==&RBH_Save_1_info) ||
3156 (info_ptr==&RBH_Save_2_info));
3158 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3159 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3160 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3163 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3164 node, info_type(node)));
3167 /* statistics gathering */
3168 if (RtsFlags.GranFlags.GranSimStats.Global) {
3169 // globalGranStats.tot_bq_processing_time += bq_processing_time;
3170 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
3171 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
3172 globalGranStats.tot_awbq++; // total no. of bqs awakened
3175 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3176 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3178 #elif defined(PARALLEL_HASKELL)
3180 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3182 StgBlockingQueueElement *bqe;
3184 IF_PAR_DEBUG(verbose,
3185 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3189 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3190 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3195 ASSERT(q == END_BQ_QUEUE ||
3196 get_itbl(q)->type == TSO ||
3197 get_itbl(q)->type == BLOCKED_FETCH ||
3198 get_itbl(q)->type == CONSTR);
3201 while (get_itbl(bqe)->type==TSO ||
3202 get_itbl(bqe)->type==BLOCKED_FETCH) {
3203 bqe = unblockOne(bqe, node);
3207 #else /* !GRAN && !PARALLEL_HASKELL */
3210 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3212 if (tso == NULL) return; // hack; see bug #1235728, and comments in
3214 while (tso != END_TSO_QUEUE) {
3215 tso = unblockOne(cap,tso);
3220 /* ---------------------------------------------------------------------------
3222 - usually called inside a signal handler so it mustn't do anything fancy.
3223 ------------------------------------------------------------------------ */
3226 interruptStgRts(void)
3230 #if defined(THREADED_RTS)
3231 prodAllCapabilities();
3235 /* -----------------------------------------------------------------------------
3238 This is for use when we raise an exception in another thread, which
3240 This has nothing to do with the UnblockThread event in GranSim. -- HWL
3241 -------------------------------------------------------------------------- */
3243 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3245 NB: only the type of the blocking queue is different in GranSim and GUM
3246 the operations on the queue-elements are the same
3247 long live polymorphism!
3249 Locks: sched_mutex is held upon entry and exit.
3253 unblockThread(Capability *cap, StgTSO *tso)
3255 StgBlockingQueueElement *t, **last;
3257 switch (tso->why_blocked) {
3260 return; /* not blocked */
3263 // Be careful: nothing to do here! We tell the scheduler that the thread
3264 // is runnable and we leave it to the stack-walking code to abort the
3265 // transaction while unwinding the stack. We should perhaps have a debugging
3266 // test to make sure that this really happens and that the 'zombie' transaction
3267 // does not get committed.
3271 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3273 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3274 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3276 last = (StgBlockingQueueElement **)&mvar->head;
3277 for (t = (StgBlockingQueueElement *)mvar->head;
3279 last = &t->link, last_tso = t, t = t->link) {
3280 if (t == (StgBlockingQueueElement *)tso) {
3281 *last = (StgBlockingQueueElement *)tso->link;
3282 if (mvar->tail == tso) {
3283 mvar->tail = (StgTSO *)last_tso;
3288 barf("unblockThread (MVAR): TSO not found");
3291 case BlockedOnBlackHole:
3292 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3294 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3296 last = &bq->blocking_queue;
3297 for (t = bq->blocking_queue;
3299 last = &t->link, t = t->link) {
3300 if (t == (StgBlockingQueueElement *)tso) {
3301 *last = (StgBlockingQueueElement *)tso->link;
3305 barf("unblockThread (BLACKHOLE): TSO not found");
3308 case BlockedOnException:
3310 StgTSO *target = tso->block_info.tso;
3312 ASSERT(get_itbl(target)->type == TSO);
3314 if (target->what_next == ThreadRelocated) {
3315 target = target->link;
3316 ASSERT(get_itbl(target)->type == TSO);
3319 ASSERT(target->blocked_exceptions != NULL);
3321 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3322 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
3324 last = &t->link, t = t->link) {
3325 ASSERT(get_itbl(t)->type == TSO);
3326 if (t == (StgBlockingQueueElement *)tso) {
3327 *last = (StgBlockingQueueElement *)tso->link;
3331 barf("unblockThread (Exception): TSO not found");
3335 case BlockedOnWrite:
3336 #if defined(mingw32_HOST_OS)
3337 case BlockedOnDoProc:
3340 /* take TSO off blocked_queue */
3341 StgBlockingQueueElement *prev = NULL;
3342 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
3343 prev = t, t = t->link) {
3344 if (t == (StgBlockingQueueElement *)tso) {
3346 blocked_queue_hd = (StgTSO *)t->link;
3347 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3348 blocked_queue_tl = END_TSO_QUEUE;
3351 prev->link = t->link;
3352 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3353 blocked_queue_tl = (StgTSO *)prev;
3356 #if defined(mingw32_HOST_OS)
3357 /* (Cooperatively) signal that the worker thread should abort
3360 abandonWorkRequest(tso->block_info.async_result->reqID);
3365 barf("unblockThread (I/O): TSO not found");
3368 case BlockedOnDelay:
3370 /* take TSO off sleeping_queue */
3371 StgBlockingQueueElement *prev = NULL;
3372 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
3373 prev = t, t = t->link) {
3374 if (t == (StgBlockingQueueElement *)tso) {
3376 sleeping_queue = (StgTSO *)t->link;
3378 prev->link = t->link;
3383 barf("unblockThread (delay): TSO not found");
3387 barf("unblockThread");
3391 tso->link = END_TSO_QUEUE;
3392 tso->why_blocked = NotBlocked;
3393 tso->block_info.closure = NULL;
3394 pushOnRunQueue(cap,tso);
3398 unblockThread(Capability *cap, StgTSO *tso)
3402 /* To avoid locking unnecessarily. */
3403 if (tso->why_blocked == NotBlocked) {
3407 switch (tso->why_blocked) {
3410 // Be careful: nothing to do here! We tell the scheduler that the thread
3411 // is runnable and we leave it to the stack-walking code to abort the
3412 // transaction while unwinding the stack. We should perhaps have a debugging
3413 // test to make sure that this really happens and that the 'zombie' transaction
3414 // does not get committed.
3418 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3420 StgTSO *last_tso = END_TSO_QUEUE;
3421 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3424 for (t = mvar->head; t != END_TSO_QUEUE;
3425 last = &t->link, last_tso = t, t = t->link) {
3428 if (mvar->tail == tso) {
3429 mvar->tail = last_tso;
3434 barf("unblockThread (MVAR): TSO not found");
3437 case BlockedOnBlackHole:
3439 last = &blackhole_queue;
3440 for (t = blackhole_queue; t != END_TSO_QUEUE;
3441 last = &t->link, t = t->link) {
3447 barf("unblockThread (BLACKHOLE): TSO not found");
3450 case BlockedOnException:
3452 StgTSO *target = tso->block_info.tso;
3454 ASSERT(get_itbl(target)->type == TSO);
3456 while (target->what_next == ThreadRelocated) {
3457 target = target->link;
3458 ASSERT(get_itbl(target)->type == TSO);
3461 ASSERT(target->blocked_exceptions != NULL);
3463 last = &target->blocked_exceptions;
3464 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
3465 last = &t->link, t = t->link) {
3466 ASSERT(get_itbl(t)->type == TSO);
3472 barf("unblockThread (Exception): TSO not found");
3475 #if !defined(THREADED_RTS)
3477 case BlockedOnWrite:
3478 #if defined(mingw32_HOST_OS)
3479 case BlockedOnDoProc:
3482 StgTSO *prev = NULL;
3483 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
3484 prev = t, t = t->link) {
3487 blocked_queue_hd = t->link;
3488 if (blocked_queue_tl == t) {
3489 blocked_queue_tl = END_TSO_QUEUE;
3492 prev->link = t->link;
3493 if (blocked_queue_tl == t) {
3494 blocked_queue_tl = prev;
3497 #if defined(mingw32_HOST_OS)
3498 /* (Cooperatively) signal that the worker thread should abort
3501 abandonWorkRequest(tso->block_info.async_result->reqID);
3506 barf("unblockThread (I/O): TSO not found");
3509 case BlockedOnDelay:
3511 StgTSO *prev = NULL;
3512 for (t = sleeping_queue; t != END_TSO_QUEUE;
3513 prev = t, t = t->link) {
3516 sleeping_queue = t->link;
3518 prev->link = t->link;
3523 barf("unblockThread (delay): TSO not found");
3528 barf("unblockThread");
3532 tso->link = END_TSO_QUEUE;
3533 tso->why_blocked = NotBlocked;
3534 tso->block_info.closure = NULL;
3535 appendToRunQueue(cap,tso);
3539 /* -----------------------------------------------------------------------------
3542 * Check the blackhole_queue for threads that can be woken up. We do
3543 * this periodically: before every GC, and whenever the run queue is
3546 * An elegant solution might be to just wake up all the blocked
3547 * threads with awakenBlockedQueue occasionally: they'll go back to
3548 * sleep again if the object is still a BLACKHOLE. Unfortunately this
3549 * doesn't give us a way to tell whether we've actually managed to
3550 * wake up any threads, so we would be busy-waiting.
3552 * -------------------------------------------------------------------------- */
3555 checkBlackHoles (Capability *cap)
3558 rtsBool any_woke_up = rtsFalse;
3561 // blackhole_queue is global:
3562 ASSERT_LOCK_HELD(&sched_mutex);
3564 IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3566 // ASSUMES: sched_mutex
3567 prev = &blackhole_queue;
3568 t = blackhole_queue;
3569 while (t != END_TSO_QUEUE) {
3570 ASSERT(t->why_blocked == BlockedOnBlackHole);
3571 type = get_itbl(t->block_info.closure)->type;
3572 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3573 IF_DEBUG(sanity,checkTSO(t));
3574 t = unblockOne(cap, t);
3575 // urk, the threads migrate to the current capability
3576 // here, but we'd like to keep them on the original one.
3578 any_woke_up = rtsTrue;
3588 /* -----------------------------------------------------------------------------
3591 * The following function implements the magic for raising an
3592 * asynchronous exception in an existing thread.
3594 * We first remove the thread from any queue on which it might be
3595 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3597 * We strip the stack down to the innermost CATCH_FRAME, building
3598 * thunks in the heap for all the active computations, so they can
3599 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3600 * an application of the handler to the exception, and push it on
3601 * the top of the stack.
3603 * How exactly do we save all the active computations? We create an
3604 * AP_STACK for every UpdateFrame on the stack. Entering one of these
3605 * AP_STACKs pushes everything from the corresponding update frame
3606 * upwards onto the stack. (Actually, it pushes everything up to the
3607 * next update frame plus a pointer to the next AP_STACK object.
3608 * Entering the next AP_STACK object pushes more onto the stack until we
3609 * reach the last AP_STACK object - at which point the stack should look
3610 * exactly as it did when we killed the TSO and we can continue
3611 * execution by entering the closure on top of the stack.
3613 * We can also kill a thread entirely - this happens if either (a) the
3614 * exception passed to raiseAsync is NULL, or (b) there's no
3615 * CATCH_FRAME on the stack. In either case, we strip the entire
3616 * stack and replace the thread with a zombie.
3618 * ToDo: in SMP mode, this function is only safe if either (a) we hold
3619 * all the Capabilities (eg. in GC), or (b) we own the Capability that
3620 * the TSO is currently blocked on or on the run queue of.
3622 * -------------------------------------------------------------------------- */
3625 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3627 raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3631 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3633 raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3637 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
3638 rtsBool stop_at_atomically, StgPtr stop_here)
3640 StgRetInfoTable *info;
3644 // Thread already dead?
3645 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3650 sched_belch("raising exception in thread %ld.", (long)tso->id));
3652 // Remove it from any blocking queues
3653 unblockThread(cap,tso);
3657 // The stack freezing code assumes there's a closure pointer on
3658 // the top of the stack, so we have to arrange that this is the case...
3660 if (sp[0] == (W_)&stg_enter_info) {
3664 sp[0] = (W_)&stg_dummy_ret_closure;
3668 while (stop_here == NULL || frame < stop_here) {
3670 // 1. Let the top of the stack be the "current closure"
3672 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3675 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3676 // current closure applied to the chunk of stack up to (but not
3677 // including) the update frame. This closure becomes the "current
3678 // closure". Go back to step 2.
3680 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3681 // top of the stack applied to the exception.
3683 // 5. If it's a STOP_FRAME, then kill the thread.
3685 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
3688 info = get_ret_itbl((StgClosure *)frame);
3690 switch (info->i.type) {
3697 // First build an AP_STACK consisting of the stack chunk above the
3698 // current update frame, with the top word on the stack as the
3701 words = frame - sp - 1;
3702 ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3705 ap->fun = (StgClosure *)sp[0];
3707 for(i=0; i < (nat)words; ++i) {
3708 ap->payload[i] = (StgClosure *)*sp++;
3711 SET_HDR(ap,&stg_AP_STACK_info,
3712 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3713 TICK_ALLOC_UP_THK(words+1,0);
3716 debugBelch("sched: Updating ");
3717 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3718 debugBelch(" with ");
3719 printObj((StgClosure *)ap);
3722 // Replace the updatee with an indirection
3724 // Warning: if we're in a loop, more than one update frame on
3725 // the stack may point to the same object. Be careful not to
3726 // overwrite an IND_OLDGEN in this case, because we'll screw
3727 // up the mutable lists. To be on the safe side, don't
3728 // overwrite any kind of indirection at all. See also
3729 // threadSqueezeStack in GC.c, where we have to make a similar
3732 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3733 // revert the black hole
3734 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3737 sp += sizeofW(StgUpdateFrame) - 1;
3738 sp[0] = (W_)ap; // push onto stack
3740 continue; //no need to bump frame
3744 // We've stripped the entire stack, the thread is now dead.
3745 tso->what_next = ThreadKilled;
3746 tso->sp = frame + sizeofW(StgStopFrame);
3750 // If we find a CATCH_FRAME, and we've got an exception to raise,
3751 // then build the THUNK raise(exception), and leave it on
3752 // top of the CATCH_FRAME ready to enter.
3756 StgCatchFrame *cf = (StgCatchFrame *)frame;
3760 if (exception == NULL) break;
3762 // we've got an exception to raise, so let's pass it to the
3763 // handler in this frame.
3765 raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3766 TICK_ALLOC_SE_THK(1,0);
3767 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3768 raise->payload[0] = exception;
3770 // throw away the stack from Sp up to the CATCH_FRAME.
3774 /* Ensure that async excpetions are blocked now, so we don't get
3775 * a surprise exception before we get around to executing the
3778 if (tso->blocked_exceptions == NULL) {
3779 tso->blocked_exceptions = END_TSO_QUEUE;
3782 /* Put the newly-built THUNK on top of the stack, ready to execute
3783 * when the thread restarts.
3786 sp[-1] = (W_)&stg_enter_info;
3788 tso->what_next = ThreadRunGHC;
3789 IF_DEBUG(sanity, checkTSO(tso));
3793 case ATOMICALLY_FRAME:
3794 if (stop_at_atomically) {
3795 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3796 stmCondemnTransaction(cap, tso -> trec);
3800 // R1 is not a register: the return convention for IO in
3801 // this case puts the return value on the stack, so we
3802 // need to set up the stack to return to the atomically
3803 // frame properly...
3804 tso->sp = frame - 2;
3805 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3806 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3808 tso->what_next = ThreadRunGHC;
3811 // Not stop_at_atomically... fall through and abort the
3814 case CATCH_RETRY_FRAME:
3815 // IF we find an ATOMICALLY_FRAME then we abort the
3816 // current transaction and propagate the exception. In
3817 // this case (unlike ordinary exceptions) we do not care
3818 // whether the transaction is valid or not because its
3819 // possible validity cannot have caused the exception
3820 // and will not be visible after the abort.
3822 debugBelch("Found atomically block delivering async exception\n"));
3823 StgTRecHeader *trec = tso -> trec;
3824 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3825 stmAbortTransaction(cap, trec);
3826 tso -> trec = outer;
3833 // move on to the next stack frame
3834 frame += stack_frame_sizeW((StgClosure *)frame);
3837 // if we got here, then we stopped at stop_here
3838 ASSERT(stop_here != NULL);
3841 /* -----------------------------------------------------------------------------
3844 This is used for interruption (^C) and forking, and corresponds to
3845 raising an exception but without letting the thread catch the
3847 -------------------------------------------------------------------------- */
3850 deleteThread (Capability *cap, StgTSO *tso)
3852 if (tso->why_blocked != BlockedOnCCall &&
3853 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3854 raiseAsync(cap,tso,NULL);
3858 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3860 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3861 { // for forkProcess only:
3862 // delete thread without giving it a chance to catch the KillThread exception
3864 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3868 if (tso->why_blocked != BlockedOnCCall &&
3869 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3870 unblockThread(cap,tso);
3873 tso->what_next = ThreadKilled;
3877 /* -----------------------------------------------------------------------------
3878 raiseExceptionHelper
3880 This function is called by the raise# primitve, just so that we can
3881 move some of the tricky bits of raising an exception from C-- into
3882 C. Who knows, it might be a useful re-useable thing here too.
3883 -------------------------------------------------------------------------- */
3886 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3888 Capability *cap = regTableToCapability(reg);
3889 StgThunk *raise_closure = NULL;
3891 StgRetInfoTable *info;
3893 // This closure represents the expression 'raise# E' where E
3894 // is the exception raise. It is used to overwrite all the
3895 // thunks which are currently under evaluataion.
3899 // LDV profiling: stg_raise_info has THUNK as its closure
3900 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3901 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3902 // 1 does not cause any problem unless profiling is performed.
3903 // However, when LDV profiling goes on, we need to linearly scan
3904 // small object pool, where raise_closure is stored, so we should
3905 // use MIN_UPD_SIZE.
3907 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3908 // sizeofW(StgClosure)+1);
3912 // Walk up the stack, looking for the catch frame. On the way,
3913 // we update any closures pointed to from update frames with the
3914 // raise closure that we just built.
3918 info = get_ret_itbl((StgClosure *)p);
3919 next = p + stack_frame_sizeW((StgClosure *)p);
3920 switch (info->i.type) {
3923 // Only create raise_closure if we need to.
3924 if (raise_closure == NULL) {
3926 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3927 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3928 raise_closure->payload[0] = exception;
3930 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3934 case ATOMICALLY_FRAME:
3935 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3937 return ATOMICALLY_FRAME;
3943 case CATCH_STM_FRAME:
3944 IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3946 return CATCH_STM_FRAME;
3952 case CATCH_RETRY_FRAME:
3961 /* -----------------------------------------------------------------------------
3962 findRetryFrameHelper
3964 This function is called by the retry# primitive. It traverses the stack
3965 leaving tso->sp referring to the frame which should handle the retry.
3967 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3968 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3970 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3971 despite the similar implementation.
3973 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3974 not be created within memory transactions.
3975 -------------------------------------------------------------------------- */
3978 findRetryFrameHelper (StgTSO *tso)
3981 StgRetInfoTable *info;
3985 info = get_ret_itbl((StgClosure *)p);
3986 next = p + stack_frame_sizeW((StgClosure *)p);
3987 switch (info->i.type) {
3989 case ATOMICALLY_FRAME:
3990 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3992 return ATOMICALLY_FRAME;
3994 case CATCH_RETRY_FRAME:
3995 IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3997 return CATCH_RETRY_FRAME;
3999 case CATCH_STM_FRAME:
4001 ASSERT(info->i.type != CATCH_FRAME);
4002 ASSERT(info->i.type != STOP_FRAME);
4009 /* -----------------------------------------------------------------------------
4010 resurrectThreads is called after garbage collection on the list of
4011 threads found to be garbage. Each of these threads will be woken
4012 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4013 on an MVar, or NonTermination if the thread was blocked on a Black
4016 Locks: assumes we hold *all* the capabilities.
4017 -------------------------------------------------------------------------- */
4020 resurrectThreads (StgTSO *threads)
4025 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4026 next = tso->global_link;
4027 tso->global_link = all_threads;
4029 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4031 // Wake up the thread on the Capability it was last on for a
4032 // bound thread, or last_free_capability otherwise.
4034 cap = tso->bound->cap;
4036 cap = last_free_capability;
4039 switch (tso->why_blocked) {
4041 case BlockedOnException:
4042 /* Called by GC - sched_mutex lock is currently held. */
4043 raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4045 case BlockedOnBlackHole:
4046 raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4049 raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4052 /* This might happen if the thread was blocked on a black hole
4053 * belonging to a thread that we've just woken up (raiseAsync
4054 * can wake up threads, remember...).
4058 barf("resurrectThreads: thread blocked in a strange way");
4063 /* ----------------------------------------------------------------------------
4064 * Debugging: why is a thread blocked
4065 * [Also provides useful information when debugging threaded programs
4066 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4067 ------------------------------------------------------------------------- */
4071 printThreadBlockage(StgTSO *tso)
4073 switch (tso->why_blocked) {
4075 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4077 case BlockedOnWrite:
4078 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4080 #if defined(mingw32_HOST_OS)
4081 case BlockedOnDoProc:
4082 debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4085 case BlockedOnDelay:
4086 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4089 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4091 case BlockedOnException:
4092 debugBelch("is blocked on delivering an exception to thread %d",
4093 tso->block_info.tso->id);
4095 case BlockedOnBlackHole:
4096 debugBelch("is blocked on a black hole");
4099 debugBelch("is not blocked");
4101 #if defined(PARALLEL_HASKELL)
4103 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4104 tso->block_info.closure, info_type(tso->block_info.closure));
4106 case BlockedOnGA_NoSend:
4107 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4108 tso->block_info.closure, info_type(tso->block_info.closure));
4111 case BlockedOnCCall:
4112 debugBelch("is blocked on an external call");
4114 case BlockedOnCCall_NoUnblockExc:
4115 debugBelch("is blocked on an external call (exceptions were already blocked)");
4118 debugBelch("is blocked on an STM operation");
4121 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4122 tso->why_blocked, tso->id, tso);
4127 printThreadStatus(StgTSO *t)
4129 debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4131 void *label = lookupThreadLabel(t->id);
4132 if (label) debugBelch("[\"%s\"] ",(char *)label);
4134 if (t->what_next == ThreadRelocated) {
4135 debugBelch("has been relocated...\n");
4137 switch (t->what_next) {
4139 debugBelch("has been killed");
4141 case ThreadComplete:
4142 debugBelch("has completed");
4145 printThreadBlockage(t);
4152 printAllThreads(void)
4159 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4160 ullong_format_string(TIME_ON_PROC(CurrentProc),
4161 time_string, rtsFalse/*no commas!*/);
4163 debugBelch("all threads at [%s]:\n", time_string);
4164 # elif defined(PARALLEL_HASKELL)
4165 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4166 ullong_format_string(CURRENT_TIME,
4167 time_string, rtsFalse/*no commas!*/);
4169 debugBelch("all threads at [%s]:\n", time_string);
4171 debugBelch("all threads:\n");
4174 for (i = 0; i < n_capabilities; i++) {
4175 cap = &capabilities[i];
4176 debugBelch("threads on capability %d:\n", cap->no);
4177 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4178 printThreadStatus(t);
4182 debugBelch("other threads:\n");
4183 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4184 if (t->why_blocked != NotBlocked) {
4185 printThreadStatus(t);
4187 if (t->what_next == ThreadRelocated) {
4190 next = t->global_link;
4197 printThreadQueue(StgTSO *t)
4200 for (; t != END_TSO_QUEUE; t = t->link) {
4201 printThreadStatus(t);
4204 debugBelch("%d threads on queue\n", i);
4208 Print a whole blocking queue attached to node (debugging only).
4210 # if defined(PARALLEL_HASKELL)
4212 print_bq (StgClosure *node)
4214 StgBlockingQueueElement *bqe;
4218 debugBelch("## BQ of closure %p (%s): ",
4219 node, info_type(node));
4221 /* should cover all closures that may have a blocking queue */
4222 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4223 get_itbl(node)->type == FETCH_ME_BQ ||
4224 get_itbl(node)->type == RBH ||
4225 get_itbl(node)->type == MVAR);
4227 ASSERT(node!=(StgClosure*)NULL); // sanity check
4229 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4233 Print a whole blocking queue starting with the element bqe.
4236 print_bqe (StgBlockingQueueElement *bqe)
4241 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4243 for (end = (bqe==END_BQ_QUEUE);
4244 !end; // iterate until bqe points to a CONSTR
4245 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
4246 bqe = end ? END_BQ_QUEUE : bqe->link) {
4247 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4248 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4249 /* types of closures that may appear in a blocking queue */
4250 ASSERT(get_itbl(bqe)->type == TSO ||
4251 get_itbl(bqe)->type == BLOCKED_FETCH ||
4252 get_itbl(bqe)->type == CONSTR);
4253 /* only BQs of an RBH end with an RBH_Save closure */
4254 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4256 switch (get_itbl(bqe)->type) {
4258 debugBelch(" TSO %u (%x),",
4259 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4262 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4263 ((StgBlockedFetch *)bqe)->node,
4264 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4265 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4266 ((StgBlockedFetch *)bqe)->ga.weight);
4269 debugBelch(" %s (IP %p),",
4270 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4271 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4272 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4273 "RBH_Save_?"), get_itbl(bqe));
4276 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4277 info_type((StgClosure *)bqe)); // , node, info_type(node));
4283 # elif defined(GRAN)
4285 print_bq (StgClosure *node)
4287 StgBlockingQueueElement *bqe;
4288 PEs node_loc, tso_loc;
4291 /* should cover all closures that may have a blocking queue */
4292 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4293 get_itbl(node)->type == FETCH_ME_BQ ||
4294 get_itbl(node)->type == RBH);
4296 ASSERT(node!=(StgClosure*)NULL); // sanity check
4297 node_loc = where_is(node);
4299 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4300 node, info_type(node), node_loc);
4303 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4305 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4306 !end; // iterate until bqe points to a CONSTR
4307 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4308 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4309 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4310 /* types of closures that may appear in a blocking queue */
4311 ASSERT(get_itbl(bqe)->type == TSO ||
4312 get_itbl(bqe)->type == CONSTR);
4313 /* only BQs of an RBH end with an RBH_Save closure */
4314 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4316 tso_loc = where_is((StgClosure *)bqe);
4317 switch (get_itbl(bqe)->type) {
4319 debugBelch(" TSO %d (%p) on [PE %d],",
4320 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4323 debugBelch(" %s (IP %p),",
4324 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4325 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4326 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4327 "RBH_Save_?"), get_itbl(bqe));
4330 barf("Unexpected closure type %s in blocking queue of %p (%s)",
4331 info_type((StgClosure *)bqe), node, info_type(node));
4339 #if defined(PARALLEL_HASKELL)
4346 for (i=0, tso=run_queue_hd;
4347 tso != END_TSO_QUEUE;
4348 i++, tso=tso->link) {
4357 sched_belch(char *s, ...)
4362 debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4363 #elif defined(PARALLEL_HASKELL)
4366 debugBelch("sched: ");