1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2005
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
24 #include "RtsSignals.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
34 #include "Proftimer.h"
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
47 #include "Capability.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
69 // Turn off inlining when debugging - it obfuscates things
72 # define STATIC_INLINE static
75 /* -----------------------------------------------------------------------------
77 * -------------------------------------------------------------------------- */
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
85 In GranSim we have a runnable and a blocked queue for each processor.
86 In order to minimise code changes new arrays run_queue_hds/tls
87 are created. run_queue_hd is then a short cut (macro) for
88 run_queue_hds[CurrentProc] (see GranSim.h).
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95 the std RTS (i.e. we are cheating). However, we don't use this list in
96 the GranSim specific code at the moment (so we are only potentially
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
108 /* Threads blocked on blackholes.
109 * LOCK: sched_mutex+capability, or all capabilities
111 StgTSO *blackhole_queue = NULL;
114 /* The blackhole_queue should be checked for threads to wake up. See
115 * Schedule.h for more thorough comment.
116 * LOCK: none (doesn't matter if we miss an update)
118 rtsBool blackholes_need_checking = rtsFalse;
120 /* Linked list of all threads.
121 * Used for detecting garbage collected threads.
122 * LOCK: sched_mutex+capability, or all capabilities
124 StgTSO *all_threads = NULL;
126 /* flag set by signal handler to precipitate a context switch
127 * LOCK: none (just an advisory flag)
129 int context_switch = 0;
131 /* flag that tracks whether we have done any execution in this time slice.
132 * LOCK: currently none, perhaps we should lock (but needs to be
133 * updated in the fast path of the scheduler).
135 nat recent_activity = ACTIVITY_YES;
137 /* if this flag is set as well, give up execution
138 * LOCK: none (changes once, from false->true)
140 rtsBool interrupted = rtsFalse;
142 /* Next thread ID to allocate.
145 static StgThreadID next_thread_id = 1;
147 /* The smallest stack size that makes any sense is:
148 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
149 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
150 * + 1 (the closure to enter)
152 * + 1 (spare slot req'd by stg_ap_v_ret)
154 * A thread with this stack will bomb immediately with a stack
155 * overflow, which will increase its stack size.
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
163 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
164 * exists - earlier gccs apparently didn't.
170 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171 * in an MT setting, needed to signal that a worker thread shouldn't hang around
172 * in the scheduler when it is out of work.
174 rtsBool shutting_down_scheduler = rtsFalse;
177 * This mutex protects most of the global scheduler data in
178 * the THREADED_RTS and (inc. SMP) runtime.
180 #if defined(THREADED_RTS)
184 #if defined(PARALLEL_HASKELL)
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
190 /* -----------------------------------------------------------------------------
191 * static function prototypes
192 * -------------------------------------------------------------------------- */
194 static Capability *schedule (Capability *initialCapability, Task *task);
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
201 static void schedulePreLoop (void);
203 static void schedulePushWork(Capability *cap, Task *task);
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckBlackHoles (Capability *cap);
208 static void scheduleDetectDeadlock (Capability *cap, Task *task);
210 static StgTSO *scheduleProcessEvent(rtsEvent *event);
212 #if defined(PARALLEL_HASKELL)
213 static StgTSO *scheduleSendPendingMessages(void);
214 static void scheduleActivateSpark(void);
215 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
217 #if defined(PAR) || defined(GRAN)
218 static void scheduleGranParReport(void);
220 static void schedulePostRunThread(void);
221 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
222 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
224 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
225 nat prev_what_next );
226 static void scheduleHandleThreadBlocked( StgTSO *t );
227 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
229 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
230 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
232 static void unblockThread(Capability *cap, StgTSO *tso);
233 static rtsBool checkBlackHoles(Capability *cap);
234 static void AllRoots(evac_fn evac);
236 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
238 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
239 rtsBool stop_at_atomically, StgPtr stop_here);
241 static void deleteThread (Capability *cap, StgTSO *tso);
242 static void deleteRunQueue (Capability *cap);
245 static void printThreadBlockage(StgTSO *tso);
246 static void printThreadStatus(StgTSO *tso);
247 void printThreadQueue(StgTSO *tso);
250 #if defined(PARALLEL_HASKELL)
251 StgTSO * createSparkThread(rtsSpark spark);
252 StgTSO * activateSpark (rtsSpark spark);
256 static char *whatNext_strs[] = {
266 /* -----------------------------------------------------------------------------
267 * Putting a thread on the run queue: different scheduling policies
268 * -------------------------------------------------------------------------- */
271 addToRunQueue( Capability *cap, StgTSO *t )
273 #if defined(PARALLEL_HASKELL)
274 if (RtsFlags.ParFlags.doFairScheduling) {
275 // this does round-robin scheduling; good for concurrency
276 appendToRunQueue(cap,t);
278 // this does unfair scheduling; good for parallelism
279 pushOnRunQueue(cap,t);
282 // this does round-robin scheduling; good for concurrency
283 appendToRunQueue(cap,t);
287 /* ---------------------------------------------------------------------------
288 Main scheduling loop.
290 We use round-robin scheduling, each thread returning to the
291 scheduler loop when one of these conditions is detected:
294 * timer expires (thread yields)
300 In a GranSim setup this loop iterates over the global event queue.
301 This revolves around the global event queue, which determines what
302 to do next. Therefore, it's more complicated than either the
303 concurrent or the parallel (GUM) setup.
306 GUM iterates over incoming messages.
307 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
308 and sends out a fish whenever it has nothing to do; in-between
309 doing the actual reductions (shared code below) it processes the
310 incoming messages and deals with delayed operations
311 (see PendingFetches).
312 This is not the ugliest code you could imagine, but it's bloody close.
314 ------------------------------------------------------------------------ */
317 schedule (Capability *initialCapability, Task *task)
321 StgThreadReturnCode ret;
324 #elif defined(PARALLEL_HASKELL)
327 rtsBool receivedFinish = rtsFalse;
329 nat tp_size, sp_size; // stats only
334 #if defined(THREADED_RTS)
335 rtsBool first = rtsTrue;
338 cap = initialCapability;
340 // Pre-condition: this task owns initialCapability.
341 // The sched_mutex is *NOT* held
342 // NB. on return, we still hold a capability.
345 sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
346 task, initialCapability);
351 // -----------------------------------------------------------
352 // Scheduler loop starts here:
354 #if defined(PARALLEL_HASKELL)
355 #define TERMINATION_CONDITION (!receivedFinish)
357 #define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL)
359 #define TERMINATION_CONDITION rtsTrue
362 while (TERMINATION_CONDITION) {
365 /* Choose the processor with the next event */
366 CurrentProc = event->proc;
367 CurrentTSO = event->tso;
370 #if defined(THREADED_RTS)
372 // don't yield the first time, we want a chance to run this
373 // thread for a bit, even if there are others banging at the
376 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
378 // Yield the capability to higher-priority tasks if necessary.
379 yieldCapability(&cap, task);
384 schedulePushWork(cap,task);
387 // Check whether we have re-entered the RTS from Haskell without
388 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
390 if (cap->in_haskell) {
391 errorBelch("schedule: re-entered unsafely.\n"
392 " Perhaps a 'foreign import unsafe' should be 'safe'?");
393 stg_exit(EXIT_FAILURE);
397 // Test for interruption. If interrupted==rtsTrue, then either
398 // we received a keyboard interrupt (^C), or the scheduler is
399 // trying to shut down all the tasks (shutting_down_scheduler) in
405 discardSparksCap(cap);
407 if (shutting_down_scheduler) {
408 IF_DEBUG(scheduler, sched_belch("shutting down"));
409 // If we are a worker, just exit. If we're a bound thread
410 // then we will exit below when we've removed our TSO from
412 if (task->tso == NULL && emptyRunQueue(cap)) {
416 IF_DEBUG(scheduler, sched_belch("interrupted"));
421 // If the run queue is empty, take a spark and turn it into a thread.
423 if (emptyRunQueue(cap)) {
425 spark = findSpark(cap);
428 sched_belch("turning spark of closure %p into a thread",
429 (StgClosure *)spark));
430 createSparkThread(cap,spark);
436 scheduleStartSignalHandlers(cap);
438 // Only check the black holes here if we've nothing else to do.
439 // During normal execution, the black hole list only gets checked
440 // at GC time, to avoid repeatedly traversing this possibly long
441 // list each time around the scheduler.
442 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
444 scheduleCheckBlockedThreads(cap);
446 scheduleDetectDeadlock(cap,task);
448 // Normally, the only way we can get here with no threads to
449 // run is if a keyboard interrupt received during
450 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
451 // Additionally, it is not fatal for the
452 // threaded RTS to reach here with no threads to run.
454 // win32: might be here due to awaitEvent() being abandoned
455 // as a result of a console event having been delivered.
456 if ( emptyRunQueue(cap) ) {
457 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
460 continue; // nothing to do
463 #if defined(PARALLEL_HASKELL)
464 scheduleSendPendingMessages();
465 if (emptyRunQueue(cap) && scheduleActivateSpark())
469 ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left!
472 /* If we still have no work we need to send a FISH to get a spark
474 if (emptyRunQueue(cap)) {
475 if (!scheduleGetRemoteWork(&receivedFinish)) continue;
476 ASSERT(rtsFalse); // should not happen at the moment
478 // from here: non-empty run queue.
479 // TODO: merge above case with this, only one call processMessages() !
480 if (PacketsWaiting()) { /* process incoming messages, if
481 any pending... only in else
482 because getRemoteWork waits for
484 receivedFinish = processMessages();
489 scheduleProcessEvent(event);
493 // Get a thread to run
495 t = popRunQueue(cap);
497 #if defined(GRAN) || defined(PAR)
498 scheduleGranParReport(); // some kind of debuging output
500 // Sanity check the thread we're about to run. This can be
501 // expensive if there is lots of thread switching going on...
502 IF_DEBUG(sanity,checkTSO(t));
505 #if defined(THREADED_RTS)
506 // Check whether we can run this thread in the current task.
507 // If not, we have to pass our capability to the right task.
509 Task *bound = t->bound;
514 sched_belch("### Running thread %d in bound thread",
516 // yes, the Haskell thread is bound to the current native thread
519 sched_belch("### thread %d bound to another OS thread",
521 // no, bound to a different Haskell thread: pass to that thread
522 pushOnRunQueue(cap,t);
526 // The thread we want to run is unbound.
529 sched_belch("### this OS thread cannot run thread %d", t->id));
530 // no, the current native thread is bound to a different
531 // Haskell thread, so pass it to any worker thread
532 pushOnRunQueue(cap,t);
539 cap->r.rCurrentTSO = t;
541 /* context switches are initiated by the timer signal, unless
542 * the user specified "context switch as often as possible", with
545 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
546 && !emptyThreadQueues(cap)) {
552 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
553 (long)t->id, whatNext_strs[t->what_next]));
555 #if defined(PROFILING)
556 startHeapProfTimer();
559 // ----------------------------------------------------------------------
560 // Run the current thread
562 prev_what_next = t->what_next;
564 errno = t->saved_errno;
565 cap->in_haskell = rtsTrue;
569 recent_activity = ACTIVITY_YES;
571 switch (prev_what_next) {
575 /* Thread already finished, return to scheduler. */
576 ret = ThreadFinished;
582 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
583 cap = regTableToCapability(r);
588 case ThreadInterpret:
589 cap = interpretBCO(cap);
594 barf("schedule: invalid what_next field");
597 cap->in_haskell = rtsFalse;
599 // The TSO might have moved, eg. if it re-entered the RTS and a GC
600 // happened. So find the new location:
601 t = cap->r.rCurrentTSO;
603 // We have run some Haskell code: there might be blackhole-blocked
604 // threads to wake up now.
605 // Lock-free test here should be ok, we're just setting a flag.
606 if ( blackhole_queue != END_TSO_QUEUE ) {
607 blackholes_need_checking = rtsTrue;
610 // And save the current errno in this thread.
611 // XXX: possibly bogus for SMP because this thread might already
612 // be running again, see code below.
613 t->saved_errno = errno;
616 // If ret is ThreadBlocked, and this Task is bound to the TSO that
617 // blocked, we are in limbo - the TSO is now owned by whatever it
618 // is blocked on, and may in fact already have been woken up,
619 // perhaps even on a different Capability. It may be the case
620 // that task->cap != cap. We better yield this Capability
621 // immediately and return to normaility.
622 if (ret == ThreadBlocked) {
624 sched_belch("--<< thread %d (%s) stopped: blocked\n",
625 t->id, whatNext_strs[t->what_next]));
630 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
632 // ----------------------------------------------------------------------
634 // Costs for the scheduler are assigned to CCS_SYSTEM
635 #if defined(PROFILING)
640 #if defined(THREADED_RTS)
641 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
642 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
643 IF_DEBUG(scheduler,debugBelch("sched: "););
646 schedulePostRunThread();
648 ready_to_gc = rtsFalse;
652 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
656 scheduleHandleStackOverflow(cap,task,t);
660 if (scheduleHandleYield(cap, t, prev_what_next)) {
661 // shortcut for switching between compiler/interpreter:
667 scheduleHandleThreadBlocked(t);
671 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
672 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
676 barf("schedule: invalid thread return code %d", (int)ret);
679 if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
680 if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
681 } /* end of while() */
683 IF_PAR_DEBUG(verbose,
684 debugBelch("== Leaving schedule() after having received Finish\n"));
687 /* ----------------------------------------------------------------------------
688 * Setting up the scheduler loop
689 * ------------------------------------------------------------------------- */
692 schedulePreLoop(void)
695 /* set up first event to get things going */
696 /* ToDo: assign costs for system setup and init MainTSO ! */
697 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
699 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
702 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n",
704 G_TSO(CurrentTSO, 5));
706 if (RtsFlags.GranFlags.Light) {
707 /* Save current time; GranSim Light only */
708 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
713 /* -----------------------------------------------------------------------------
716 * Push work to other Capabilities if we have some.
717 * -------------------------------------------------------------------------- */
721 schedulePushWork(Capability *cap USED_IF_SMP,
722 Task *task USED_IF_SMP)
724 Capability *free_caps[n_capabilities], *cap0;
727 // Check whether we have more threads on our run queue, or sparks
728 // in our pool, that we could hand to another Capability.
729 if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
730 && sparkPoolSizeCap(cap) < 2) {
734 // First grab as many free Capabilities as we can.
735 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
736 cap0 = &capabilities[i];
737 if (cap != cap0 && tryGrabCapability(cap0,task)) {
738 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
739 // it already has some work, we just grabbed it at
740 // the wrong moment. Or maybe it's deadlocked!
741 releaseCapability(cap0);
743 free_caps[n_free_caps++] = cap0;
748 // we now have n_free_caps free capabilities stashed in
749 // free_caps[]. Share our run queue equally with them. This is
750 // probably the simplest thing we could do; improvements we might
751 // want to do include:
753 // - giving high priority to moving relatively new threads, on
754 // the gournds that they haven't had time to build up a
755 // working set in the cache on this CPU/Capability.
757 // - giving low priority to moving long-lived threads
759 if (n_free_caps > 0) {
760 StgTSO *prev, *t, *next;
761 rtsBool pushed_to_all;
763 IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
766 pushed_to_all = rtsFalse;
768 if (cap->run_queue_hd != END_TSO_QUEUE) {
769 prev = cap->run_queue_hd;
771 prev->link = END_TSO_QUEUE;
772 for (; t != END_TSO_QUEUE; t = next) {
774 t->link = END_TSO_QUEUE;
775 if (t->what_next == ThreadRelocated
776 || t->bound == task) { // don't move my bound thread
779 } else if (i == n_free_caps) {
780 pushed_to_all = rtsTrue;
786 IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
787 appendToRunQueue(free_caps[i],t);
788 if (t->bound) { t->bound->cap = free_caps[i]; }
792 cap->run_queue_tl = prev;
795 // If there are some free capabilities that we didn't push any
796 // threads to, then try to push a spark to each one.
797 if (!pushed_to_all) {
799 // i is the next free capability to push to
800 for (; i < n_free_caps; i++) {
801 if (emptySparkPoolCap(free_caps[i])) {
802 spark = findSpark(cap);
804 IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
805 newSpark(&(free_caps[i]->r), spark);
811 // release the capabilities
812 for (i = 0; i < n_free_caps; i++) {
813 task->cap = free_caps[i];
814 releaseCapability(free_caps[i]);
817 task->cap = cap; // reset to point to our Capability.
821 /* ----------------------------------------------------------------------------
822 * Start any pending signal handlers
823 * ------------------------------------------------------------------------- */
825 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
827 scheduleStartSignalHandlers(Capability *cap)
829 if (signals_pending()) { // safe outside the lock
830 startSignalHandlers(cap);
835 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
840 /* ----------------------------------------------------------------------------
841 * Check for blocked threads that can be woken up.
842 * ------------------------------------------------------------------------- */
845 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
847 #if !defined(THREADED_RTS)
849 // Check whether any waiting threads need to be woken up. If the
850 // run queue is empty, and there are no other tasks running, we
851 // can wait indefinitely for something to happen.
853 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
855 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
861 /* ----------------------------------------------------------------------------
862 * Check for threads blocked on BLACKHOLEs that can be woken up
863 * ------------------------------------------------------------------------- */
865 scheduleCheckBlackHoles (Capability *cap)
867 if ( blackholes_need_checking ) // check without the lock first
869 ACQUIRE_LOCK(&sched_mutex);
870 if ( blackholes_need_checking ) {
871 checkBlackHoles(cap);
872 blackholes_need_checking = rtsFalse;
874 RELEASE_LOCK(&sched_mutex);
878 /* ----------------------------------------------------------------------------
879 * Detect deadlock conditions and attempt to resolve them.
880 * ------------------------------------------------------------------------- */
883 scheduleDetectDeadlock (Capability *cap, Task *task)
886 #if defined(PARALLEL_HASKELL)
887 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
892 * Detect deadlock: when we have no threads to run, there are no
893 * threads blocked, waiting for I/O, or sleeping, and all the
894 * other tasks are waiting for work, we must have a deadlock of
897 if ( emptyThreadQueues(cap) )
899 #if defined(THREADED_RTS)
901 * In the threaded RTS, we only check for deadlock if there
902 * has been no activity in a complete timeslice. This means
903 * we won't eagerly start a full GC just because we don't have
904 * any threads to run currently.
906 if (recent_activity != ACTIVITY_INACTIVE) return;
909 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
911 // Garbage collection can release some new threads due to
912 // either (a) finalizers or (b) threads resurrected because
913 // they are unreachable and will therefore be sent an
914 // exception. Any threads thus released will be immediately
916 scheduleDoGC( cap, task, rtsTrue/*force major GC*/ );
917 recent_activity = ACTIVITY_DONE_GC;
919 if ( !emptyRunQueue(cap) ) return;
921 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
922 /* If we have user-installed signal handlers, then wait
923 * for signals to arrive rather then bombing out with a
926 if ( anyUserHandlers() ) {
928 sched_belch("still deadlocked, waiting for signals..."));
932 if (signals_pending()) {
933 startSignalHandlers(cap);
936 // either we have threads to run, or we were interrupted:
937 ASSERT(!emptyRunQueue(cap) || interrupted);
941 #if !defined(THREADED_RTS)
942 /* Probably a real deadlock. Send the current main thread the
943 * Deadlock exception.
946 switch (task->tso->why_blocked) {
948 case BlockedOnBlackHole:
949 case BlockedOnException:
951 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
954 barf("deadlock: main thread blocked in a strange way");
962 /* ----------------------------------------------------------------------------
963 * Process an event (GRAN only)
964 * ------------------------------------------------------------------------- */
968 scheduleProcessEvent(rtsEvent *event)
972 if (RtsFlags.GranFlags.Light)
973 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
975 /* adjust time based on time-stamp */
976 if (event->time > CurrentTime[CurrentProc] &&
977 event->evttype != ContinueThread)
978 CurrentTime[CurrentProc] = event->time;
980 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
981 if (!RtsFlags.GranFlags.Light)
984 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
986 /* main event dispatcher in GranSim */
987 switch (event->evttype) {
988 /* Should just be continuing execution */
990 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
991 /* ToDo: check assertion
992 ASSERT(run_queue_hd != (StgTSO*)NULL &&
993 run_queue_hd != END_TSO_QUEUE);
995 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
996 if (!RtsFlags.GranFlags.DoAsyncFetch &&
997 procStatus[CurrentProc]==Fetching) {
998 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
999 CurrentTSO->id, CurrentTSO, CurrentProc);
1002 /* Ignore ContinueThreads for completed threads */
1003 if (CurrentTSO->what_next == ThreadComplete) {
1004 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
1005 CurrentTSO->id, CurrentTSO, CurrentProc);
1008 /* Ignore ContinueThreads for threads that are being migrated */
1009 if (PROCS(CurrentTSO)==Nowhere) {
1010 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1011 CurrentTSO->id, CurrentTSO, CurrentProc);
1014 /* The thread should be at the beginning of the run queue */
1015 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
1016 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1017 CurrentTSO->id, CurrentTSO, CurrentProc);
1018 break; // run the thread anyway
1021 new_event(proc, proc, CurrentTime[proc],
1023 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1025 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1026 break; // now actually run the thread; DaH Qu'vam yImuHbej
1029 do_the_fetchnode(event);
1030 goto next_thread; /* handle next event in event queue */
1033 do_the_globalblock(event);
1034 goto next_thread; /* handle next event in event queue */
1037 do_the_fetchreply(event);
1038 goto next_thread; /* handle next event in event queue */
1040 case UnblockThread: /* Move from the blocked queue to the tail of */
1041 do_the_unblock(event);
1042 goto next_thread; /* handle next event in event queue */
1044 case ResumeThread: /* Move from the blocked queue to the tail of */
1045 /* the runnable queue ( i.e. Qu' SImqa'lu') */
1046 event->tso->gran.blocktime +=
1047 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1048 do_the_startthread(event);
1049 goto next_thread; /* handle next event in event queue */
1052 do_the_startthread(event);
1053 goto next_thread; /* handle next event in event queue */
1056 do_the_movethread(event);
1057 goto next_thread; /* handle next event in event queue */
1060 do_the_movespark(event);
1061 goto next_thread; /* handle next event in event queue */
1064 do_the_findwork(event);
1065 goto next_thread; /* handle next event in event queue */
1068 barf("Illegal event type %u\n", event->evttype);
1071 /* This point was scheduler_loop in the old RTS */
1073 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1075 TimeOfLastEvent = CurrentTime[CurrentProc];
1076 TimeOfNextEvent = get_time_of_next_event();
1077 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1078 // CurrentTSO = ThreadQueueHd;
1080 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
1083 if (RtsFlags.GranFlags.Light)
1084 GranSimLight_leave_system(event, &ActiveTSO);
1086 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1089 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1091 /* in a GranSim setup the TSO stays on the run queue */
1093 /* Take a thread from the run queue. */
1094 POP_RUN_QUEUE(t); // take_off_run_queue(t);
1097 debugBelch("GRAN: About to run current thread, which is\n");
1100 context_switch = 0; // turned on via GranYield, checking events and time slice
1103 DumpGranEvent(GR_SCHEDULE, t));
1105 procStatus[CurrentProc] = Busy;
1109 /* ----------------------------------------------------------------------------
1110 * Send pending messages (PARALLEL_HASKELL only)
1111 * ------------------------------------------------------------------------- */
1113 #if defined(PARALLEL_HASKELL)
1115 scheduleSendPendingMessages(void)
1121 # if defined(PAR) // global Mem.Mgmt., omit for now
1122 if (PendingFetches != END_BF_QUEUE) {
1127 if (RtsFlags.ParFlags.BufferTime) {
1128 // if we use message buffering, we must send away all message
1129 // packets which have become too old...
1135 /* ----------------------------------------------------------------------------
1136 * Activate spark threads (PARALLEL_HASKELL only)
1137 * ------------------------------------------------------------------------- */
1139 #if defined(PARALLEL_HASKELL)
1141 scheduleActivateSpark(void)
1144 ASSERT(emptyRunQueue());
1145 /* We get here if the run queue is empty and want some work.
1146 We try to turn a spark into a thread, and add it to the run queue,
1147 from where it will be picked up in the next iteration of the scheduler
1151 /* :-[ no local threads => look out for local sparks */
1152 /* the spark pool for the current PE */
1153 pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1154 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1155 pool->hd < pool->tl) {
1157 * ToDo: add GC code check that we really have enough heap afterwards!!
1159 * If we're here (no runnable threads) and we have pending
1160 * sparks, we must have a space problem. Get enough space
1161 * to turn one of those pending sparks into a
1165 spark = findSpark(rtsFalse); /* get a spark */
1166 if (spark != (rtsSpark) NULL) {
1167 tso = createThreadFromSpark(spark); /* turn the spark into a thread */
1168 IF_PAR_DEBUG(fish, // schedule,
1169 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1170 tso->id, tso, advisory_thread_count));
1172 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1173 IF_PAR_DEBUG(fish, // schedule,
1174 debugBelch("==^^ failed to create thread from spark @ %lx\n",
1176 return rtsFalse; /* failed to generate a thread */
1177 } /* otherwise fall through & pick-up new tso */
1179 IF_PAR_DEBUG(fish, // schedule,
1180 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
1181 spark_queue_len(pool)));
1182 return rtsFalse; /* failed to generate a thread */
1184 return rtsTrue; /* success in generating a thread */
1185 } else { /* no more threads permitted or pool empty */
1186 return rtsFalse; /* failed to generateThread */
1189 tso = NULL; // avoid compiler warning only
1190 return rtsFalse; /* dummy in non-PAR setup */
1193 #endif // PARALLEL_HASKELL
1195 /* ----------------------------------------------------------------------------
1196 * Get work from a remote node (PARALLEL_HASKELL only)
1197 * ------------------------------------------------------------------------- */
1199 #if defined(PARALLEL_HASKELL)
1201 scheduleGetRemoteWork(rtsBool *receivedFinish)
1203 ASSERT(emptyRunQueue());
1205 if (RtsFlags.ParFlags.BufferTime) {
1206 IF_PAR_DEBUG(verbose,
1207 debugBelch("...send all pending data,"));
1210 for (i=1; i<=nPEs; i++)
1211 sendImmediately(i); // send all messages away immediately
1215 //++EDEN++ idle() , i.e. send all buffers, wait for work
1216 // suppress fishing in EDEN... just look for incoming messages
1217 // (blocking receive)
1218 IF_PAR_DEBUG(verbose,
1219 debugBelch("...wait for incoming messages...\n"));
1220 *receivedFinish = processMessages(); // blocking receive...
1222 // and reenter scheduling loop after having received something
1223 // (return rtsFalse below)
1225 # else /* activate SPARKS machinery */
1226 /* We get here, if we have no work, tried to activate a local spark, but still
1227 have no work. We try to get a remote spark, by sending a FISH message.
1228 Thread migration should be added here, and triggered when a sequence of
1229 fishes returns without work. */
1230 delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1232 /* =8-[ no local sparks => look for work on other PEs */
1234 * We really have absolutely no work. Send out a fish
1235 * (there may be some out there already), and wait for
1236 * something to arrive. We clearly can't run any threads
1237 * until a SCHEDULE or RESUME arrives, and so that's what
1238 * we're hoping to see. (Of course, we still have to
1239 * respond to other types of messages.)
1241 rtsTime now = msTime() /*CURRENT_TIME*/;
1242 IF_PAR_DEBUG(verbose,
1243 debugBelch("-- now=%ld\n", now));
1244 IF_PAR_DEBUG(fish, // verbose,
1245 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1246 (last_fish_arrived_at!=0 &&
1247 last_fish_arrived_at+delay > now)) {
1248 debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1249 now, last_fish_arrived_at+delay,
1250 last_fish_arrived_at,
1254 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1255 advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1256 if (last_fish_arrived_at==0 ||
1257 (last_fish_arrived_at+delay <= now)) { // send FISH now!
1258 /* outstandingFishes is set in sendFish, processFish;
1259 avoid flooding system with fishes via delay */
1260 next_fish_to_send_at = 0;
1262 /* ToDo: this should be done in the main scheduling loop to avoid the
1263 busy wait here; not so bad if fish delay is very small */
1264 int iq = 0; // DEBUGGING -- HWL
1265 next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send
1266 /* send a fish when ready, but process messages that arrive in the meantime */
1268 if (PacketsWaiting()) {
1270 *receivedFinish = processMessages();
1273 } while (!*receivedFinish || now<next_fish_to_send_at);
1274 // JB: This means the fish could become obsolete, if we receive
1275 // work. Better check for work again?
1276 // last line: while (!receivedFinish || !haveWork || now<...)
1277 // next line: if (receivedFinish || haveWork )
1279 if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1280 return rtsFalse; // NB: this will leave scheduler loop
1281 // immediately after return!
1283 IF_PAR_DEBUG(fish, // verbose,
1284 debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1288 // JB: IMHO, this should all be hidden inside sendFish(...)
1290 sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY,
1293 // Global statistics: count no. of fishes
1294 if (RtsFlags.ParFlags.ParStats.Global &&
1295 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1296 globalParStats.tot_fish_mess++;
1300 /* delayed fishes must have been sent by now! */
1301 next_fish_to_send_at = 0;
1304 *receivedFinish = processMessages();
1305 # endif /* SPARKS */
1308 /* NB: this function always returns rtsFalse, meaning the scheduler
1309 loop continues with the next iteration;
1311 return code means success in finding work; we enter this function
1312 if there is no local work, thus have to send a fish which takes
1313 time until it arrives with work; in the meantime we should process
1314 messages in the main loop;
1317 #endif // PARALLEL_HASKELL
1319 /* ----------------------------------------------------------------------------
1320 * PAR/GRAN: Report stats & debugging info(?)
1321 * ------------------------------------------------------------------------- */
1323 #if defined(PAR) || defined(GRAN)
1325 scheduleGranParReport(void)
1327 ASSERT(run_queue_hd != END_TSO_QUEUE);
1329 /* Take a thread from the run queue, if we have work */
1330 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
1332 /* If this TSO has got its outport closed in the meantime,
1333 * it mustn't be run. Instead, we have to clean it up as if it was finished.
1334 * It has to be marked as TH_DEAD for this purpose.
1335 * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1337 JB: TODO: investigate wether state change field could be nuked
1338 entirely and replaced by the normal tso state (whatnext
1339 field). All we want to do is to kill tsos from outside.
1342 /* ToDo: write something to the log-file
1343 if (RTSflags.ParFlags.granSimStats && !sameThread)
1344 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1348 /* the spark pool for the current PE */
1349 pool = &(cap.r.rSparks); // cap = (old) MainCap
1352 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1353 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1356 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
1357 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1359 if (RtsFlags.ParFlags.ParStats.Full &&
1360 (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1361 (emitSchedule || // forced emit
1362 (t && LastTSO && t->id != LastTSO->id))) {
1364 we are running a different TSO, so write a schedule event to log file
1365 NB: If we use fair scheduling we also have to write a deschedule
1366 event for LastTSO; with unfair scheduling we know that the
1367 previous tso has blocked whenever we switch to another tso, so
1368 we don't need it in GUM for now
1370 IF_PAR_DEBUG(fish, // schedule,
1371 debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1373 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1374 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1375 emitSchedule = rtsFalse;
1380 /* ----------------------------------------------------------------------------
1381 * After running a thread...
1382 * ------------------------------------------------------------------------- */
1385 schedulePostRunThread(void)
1388 /* HACK 675: if the last thread didn't yield, make sure to print a
1389 SCHEDULE event to the log file when StgRunning the next thread, even
1390 if it is the same one as before */
1392 TimeOfLastYield = CURRENT_TIME;
1395 /* some statistics gathering in the parallel case */
1397 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1401 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1402 globalGranStats.tot_heapover++;
1404 globalParStats.tot_heapover++;
1411 DumpGranEvent(GR_DESCHEDULE, t));
1412 globalGranStats.tot_stackover++;
1415 // DumpGranEvent(GR_DESCHEDULE, t);
1416 globalParStats.tot_stackover++;
1420 case ThreadYielding:
1423 DumpGranEvent(GR_DESCHEDULE, t));
1424 globalGranStats.tot_yields++;
1427 // DumpGranEvent(GR_DESCHEDULE, t);
1428 globalParStats.tot_yields++;
1435 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1436 t->id, t, whatNext_strs[t->what_next], t->block_info.closure,
1437 (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1438 if (t->block_info.closure!=(StgClosure*)NULL)
1439 print_bq(t->block_info.closure);
1442 // ??? needed; should emit block before
1444 DumpGranEvent(GR_DESCHEDULE, t));
1445 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1448 ASSERT(procStatus[CurrentProc]==Busy ||
1449 ((procStatus[CurrentProc]==Fetching) &&
1450 (t->block_info.closure!=(StgClosure*)NULL)));
1451 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1452 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1453 procStatus[CurrentProc]==Fetching))
1454 procStatus[CurrentProc] = Idle;
1457 //++PAR++ blockThread() writes the event (change?)
1461 case ThreadFinished:
1465 barf("parGlobalStats: unknown return code");
1471 /* -----------------------------------------------------------------------------
1472 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1473 * -------------------------------------------------------------------------- */
1476 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1478 // did the task ask for a large block?
1479 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1480 // if so, get one and push it on the front of the nursery.
1484 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1487 debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1488 (long)t->id, whatNext_strs[t->what_next], blocks));
1490 // don't do this if the nursery is (nearly) full, we'll GC first.
1491 if (cap->r.rCurrentNursery->link != NULL ||
1492 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1493 // if the nursery has only one block.
1496 bd = allocGroup( blocks );
1498 cap->r.rNursery->n_blocks += blocks;
1500 // link the new group into the list
1501 bd->link = cap->r.rCurrentNursery;
1502 bd->u.back = cap->r.rCurrentNursery->u.back;
1503 if (cap->r.rCurrentNursery->u.back != NULL) {
1504 cap->r.rCurrentNursery->u.back->link = bd;
1507 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1508 g0s0 == cap->r.rNursery);
1510 cap->r.rNursery->blocks = bd;
1512 cap->r.rCurrentNursery->u.back = bd;
1514 // initialise it as a nursery block. We initialise the
1515 // step, gen_no, and flags field of *every* sub-block in
1516 // this large block, because this is easier than making
1517 // sure that we always find the block head of a large
1518 // block whenever we call Bdescr() (eg. evacuate() and
1519 // isAlive() in the GC would both have to do this, at
1523 for (x = bd; x < bd + blocks; x++) {
1524 x->step = cap->r.rNursery;
1530 // This assert can be a killer if the app is doing lots
1531 // of large block allocations.
1532 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1534 // now update the nursery to point to the new block
1535 cap->r.rCurrentNursery = bd;
1537 // we might be unlucky and have another thread get on the
1538 // run queue before us and steal the large block, but in that
1539 // case the thread will just end up requesting another large
1541 pushOnRunQueue(cap,t);
1542 return rtsFalse; /* not actually GC'ing */
1547 debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1548 (long)t->id, whatNext_strs[t->what_next]));
1550 ASSERT(!is_on_queue(t,CurrentProc));
1551 #elif defined(PARALLEL_HASKELL)
1552 /* Currently we emit a DESCHEDULE event before GC in GUM.
1553 ToDo: either add separate event to distinguish SYSTEM time from rest
1554 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1555 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1556 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1557 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1558 emitSchedule = rtsTrue;
1562 pushOnRunQueue(cap,t);
1564 /* actual GC is done at the end of the while loop in schedule() */
1567 /* -----------------------------------------------------------------------------
1568 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1569 * -------------------------------------------------------------------------- */
1572 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1574 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1575 (long)t->id, whatNext_strs[t->what_next]));
1576 /* just adjust the stack for this thread, then pop it back
1580 /* enlarge the stack */
1581 StgTSO *new_t = threadStackOverflow(cap, t);
1583 /* The TSO attached to this Task may have moved, so update the
1586 if (task->tso == t) {
1589 pushOnRunQueue(cap,new_t);
1593 /* -----------------------------------------------------------------------------
1594 * Handle a thread that returned to the scheduler with ThreadYielding
1595 * -------------------------------------------------------------------------- */
1598 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1600 // Reset the context switch flag. We don't do this just before
1601 // running the thread, because that would mean we would lose ticks
1602 // during GC, which can lead to unfair scheduling (a thread hogs
1603 // the CPU because the tick always arrives during GC). This way
1604 // penalises threads that do a lot of allocation, but that seems
1605 // better than the alternative.
1608 /* put the thread back on the run queue. Then, if we're ready to
1609 * GC, check whether this is the last task to stop. If so, wake
1610 * up the GC thread. getThread will block during a GC until the
1614 if (t->what_next != prev_what_next) {
1615 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1616 (long)t->id, whatNext_strs[t->what_next]);
1618 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1619 (long)t->id, whatNext_strs[t->what_next]);
1624 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1626 ASSERT(t->link == END_TSO_QUEUE);
1628 // Shortcut if we're just switching evaluators: don't bother
1629 // doing stack squeezing (which can be expensive), just run the
1631 if (t->what_next != prev_what_next) {
1636 ASSERT(!is_on_queue(t,CurrentProc));
1639 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1640 checkThreadQsSanity(rtsTrue));
1644 addToRunQueue(cap,t);
1647 /* add a ContinueThread event to actually process the thread */
1648 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1650 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1652 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1659 /* -----------------------------------------------------------------------------
1660 * Handle a thread that returned to the scheduler with ThreadBlocked
1661 * -------------------------------------------------------------------------- */
1664 scheduleHandleThreadBlocked( StgTSO *t
1665 #if !defined(GRAN) && !defined(DEBUG)
1672 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1673 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1674 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1676 // ??? needed; should emit block before
1678 DumpGranEvent(GR_DESCHEDULE, t));
1679 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1682 ASSERT(procStatus[CurrentProc]==Busy ||
1683 ((procStatus[CurrentProc]==Fetching) &&
1684 (t->block_info.closure!=(StgClosure*)NULL)));
1685 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1686 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1687 procStatus[CurrentProc]==Fetching))
1688 procStatus[CurrentProc] = Idle;
1692 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1693 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1696 if (t->block_info.closure!=(StgClosure*)NULL)
1697 print_bq(t->block_info.closure));
1699 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1702 /* whatever we schedule next, we must log that schedule */
1703 emitSchedule = rtsTrue;
1707 // We don't need to do anything. The thread is blocked, and it
1708 // has tidied up its stack and placed itself on whatever queue
1709 // it needs to be on.
1712 ASSERT(t->why_blocked != NotBlocked);
1713 // This might not be true under SMP: we don't have
1714 // exclusive access to this TSO, so someone might have
1715 // woken it up by now. This actually happens: try
1716 // conc023 +RTS -N2.
1720 debugBelch("--<< thread %d (%s) stopped: ",
1721 t->id, whatNext_strs[t->what_next]);
1722 printThreadBlockage(t);
1725 /* Only for dumping event to log file
1726 ToDo: do I need this in GranSim, too?
1732 /* -----------------------------------------------------------------------------
1733 * Handle a thread that returned to the scheduler with ThreadFinished
1734 * -------------------------------------------------------------------------- */
1737 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1739 /* Need to check whether this was a main thread, and if so,
1740 * return with the return value.
1742 * We also end up here if the thread kills itself with an
1743 * uncaught exception, see Exception.cmm.
1745 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1746 t->id, whatNext_strs[t->what_next]));
1749 endThread(t, CurrentProc); // clean-up the thread
1750 #elif defined(PARALLEL_HASKELL)
1751 /* For now all are advisory -- HWL */
1752 //if(t->priority==AdvisoryPriority) ??
1753 advisory_thread_count--; // JB: Caution with this counter, buggy!
1756 if(t->dist.priority==RevalPriority)
1760 # if defined(EDENOLD)
1761 // the thread could still have an outport... (BUG)
1762 if (t->eden.outport != -1) {
1763 // delete the outport for the tso which has finished...
1764 IF_PAR_DEBUG(eden_ports,
1765 debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1766 t->eden.outport, t->id));
1769 // thread still in the process (HEAVY BUG! since outport has just been closed...)
1770 if (t->eden.epid != -1) {
1771 IF_PAR_DEBUG(eden_ports,
1772 debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1773 t->id, t->eden.epid));
1774 removeTSOfromProcess(t);
1779 if (RtsFlags.ParFlags.ParStats.Full &&
1780 !RtsFlags.ParFlags.ParStats.Suppressed)
1781 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1783 // t->par only contains statistics: left out for now...
1785 debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1786 t->id,t,t->par.sparkname));
1788 #endif // PARALLEL_HASKELL
1791 // Check whether the thread that just completed was a bound
1792 // thread, and if so return with the result.
1794 // There is an assumption here that all thread completion goes
1795 // through this point; we need to make sure that if a thread
1796 // ends up in the ThreadKilled state, that it stays on the run
1797 // queue so it can be dealt with here.
1802 if (t->bound != task) {
1803 #if !defined(THREADED_RTS)
1804 // Must be a bound thread that is not the topmost one. Leave
1805 // it on the run queue until the stack has unwound to the
1806 // point where we can deal with this. Leaving it on the run
1807 // queue also ensures that the garbage collector knows about
1808 // this thread and its return value (it gets dropped from the
1809 // all_threads list so there's no other way to find it).
1810 appendToRunQueue(cap,t);
1813 // this cannot happen in the threaded RTS, because a
1814 // bound thread can only be run by the appropriate Task.
1815 barf("finished bound thread that isn't mine");
1819 ASSERT(task->tso == t);
1821 if (t->what_next == ThreadComplete) {
1823 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1824 *(task->ret) = (StgClosure *)task->tso->sp[1];
1826 task->stat = Success;
1829 *(task->ret) = NULL;
1832 task->stat = Interrupted;
1834 task->stat = Killed;
1838 removeThreadLabel((StgWord)task->tso->id);
1840 return rtsTrue; // tells schedule() to return
1846 /* -----------------------------------------------------------------------------
1847 * Perform a heap census, if PROFILING
1848 * -------------------------------------------------------------------------- */
1851 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1853 #if defined(PROFILING)
1854 // When we have +RTS -i0 and we're heap profiling, do a census at
1855 // every GC. This lets us get repeatable runs for debugging.
1856 if (performHeapProfile ||
1857 (RtsFlags.ProfFlags.profileInterval==0 &&
1858 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1859 GarbageCollect(GetRoots, rtsTrue);
1861 performHeapProfile = rtsFalse;
1862 return rtsTrue; // true <=> we already GC'd
1868 /* -----------------------------------------------------------------------------
1869 * Perform a garbage collection if necessary
1870 * -------------------------------------------------------------------------- */
1873 scheduleDoGC( Capability *cap, Task *task USED_IF_SMP, rtsBool force_major )
1877 static volatile StgWord waiting_for_gc;
1878 rtsBool was_waiting;
1883 // In order to GC, there must be no threads running Haskell code.
1884 // Therefore, the GC thread needs to hold *all* the capabilities,
1885 // and release them after the GC has completed.
1887 // This seems to be the simplest way: previous attempts involved
1888 // making all the threads with capabilities give up their
1889 // capabilities and sleep except for the *last* one, which
1890 // actually did the GC. But it's quite hard to arrange for all
1891 // the other tasks to sleep and stay asleep.
1894 was_waiting = cas(&waiting_for_gc, 0, 1);
1897 IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1898 yieldCapability(&cap,task);
1899 } while (waiting_for_gc);
1903 for (i=0; i < n_capabilities; i++) {
1904 IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1905 if (cap != &capabilities[i]) {
1906 Capability *pcap = &capabilities[i];
1907 // we better hope this task doesn't get migrated to
1908 // another Capability while we're waiting for this one.
1909 // It won't, because load balancing happens while we have
1910 // all the Capabilities, but even so it's a slightly
1911 // unsavoury invariant.
1914 waitForReturnCapability(&pcap, task);
1915 if (pcap != &capabilities[i]) {
1916 barf("scheduleDoGC: got the wrong capability");
1921 waiting_for_gc = rtsFalse;
1924 /* Kick any transactions which are invalid back to their
1925 * atomically frames. When next scheduled they will try to
1926 * commit, this commit will fail and they will retry.
1931 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1932 if (t->what_next == ThreadRelocated) {
1935 next = t->global_link;
1936 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1937 if (!stmValidateNestOfTransactions (t -> trec)) {
1938 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1940 // strip the stack back to the
1941 // ATOMICALLY_FRAME, aborting the (nested)
1942 // transaction, and saving the stack of any
1943 // partially-evaluated thunks on the heap.
1944 raiseAsync_(cap, t, NULL, rtsTrue, NULL);
1947 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1955 // so this happens periodically:
1956 scheduleCheckBlackHoles(cap);
1958 IF_DEBUG(scheduler, printAllThreads());
1960 /* everybody back, start the GC.
1961 * Could do it in this thread, or signal a condition var
1962 * to do it in another thread. Either way, we need to
1963 * broadcast on gc_pending_cond afterward.
1965 #if defined(THREADED_RTS)
1966 IF_DEBUG(scheduler,sched_belch("doing GC"));
1968 GarbageCollect(GetRoots, force_major);
1971 // release our stash of capabilities.
1972 for (i = 0; i < n_capabilities; i++) {
1973 if (cap != &capabilities[i]) {
1974 task->cap = &capabilities[i];
1975 releaseCapability(&capabilities[i]);
1982 /* add a ContinueThread event to continue execution of current thread */
1983 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1985 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1987 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1993 /* ---------------------------------------------------------------------------
1994 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1995 * used by Control.Concurrent for error checking.
1996 * ------------------------------------------------------------------------- */
1999 rtsSupportsBoundThreads(void)
2001 #if defined(THREADED_RTS)
2008 /* ---------------------------------------------------------------------------
2009 * isThreadBound(tso): check whether tso is bound to an OS thread.
2010 * ------------------------------------------------------------------------- */
2013 isThreadBound(StgTSO* tso USED_IF_THREADS)
2015 #if defined(THREADED_RTS)
2016 return (tso->bound != NULL);
2021 /* ---------------------------------------------------------------------------
2022 * Singleton fork(). Do not copy any running threads.
2023 * ------------------------------------------------------------------------- */
2025 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2026 #define FORKPROCESS_PRIMOP_SUPPORTED
2029 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2031 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2034 forkProcess(HsStablePtr *entry
2035 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2040 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2046 IF_DEBUG(scheduler,sched_belch("forking!"));
2048 // ToDo: for SMP, we should probably acquire *all* the capabilities
2053 if (pid) { // parent
2055 // just return the pid
2061 // delete all threads
2062 cap->run_queue_hd = END_TSO_QUEUE;
2063 cap->run_queue_tl = END_TSO_QUEUE;
2065 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2068 // don't allow threads to catch the ThreadKilled exception
2069 deleteThreadImmediately(cap,t);
2072 // wipe the task list
2073 ACQUIRE_LOCK(&sched_mutex);
2074 for (task = all_tasks; task != NULL; task=task->all_link) {
2075 if (task != cap->running_task) discardTask(task);
2077 RELEASE_LOCK(&sched_mutex);
2079 cap->suspended_ccalling_tasks = NULL;
2081 #if defined(THREADED_RTS)
2082 // wipe our spare workers list.
2083 cap->spare_workers = NULL;
2084 cap->returning_tasks_hd = NULL;
2085 cap->returning_tasks_tl = NULL;
2088 cap = rts_evalStableIO(cap, entry, NULL); // run the action
2089 rts_checkSchedStatus("forkProcess",cap);
2092 hs_exit(); // clean up and exit
2093 stg_exit(EXIT_SUCCESS);
2095 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2096 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2101 /* ---------------------------------------------------------------------------
2102 * Delete the threads on the run queue of the current capability.
2103 * ------------------------------------------------------------------------- */
2106 deleteRunQueue (Capability *cap)
2109 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2110 ASSERT(t->what_next != ThreadRelocated);
2112 deleteThread(cap, t);
2116 /* startThread and insertThread are now in GranSim.c -- HWL */
2119 /* -----------------------------------------------------------------------------
2120 Managing the suspended_ccalling_tasks list.
2121 Locks required: sched_mutex
2122 -------------------------------------------------------------------------- */
2125 suspendTask (Capability *cap, Task *task)
2127 ASSERT(task->next == NULL && task->prev == NULL);
2128 task->next = cap->suspended_ccalling_tasks;
2130 if (cap->suspended_ccalling_tasks) {
2131 cap->suspended_ccalling_tasks->prev = task;
2133 cap->suspended_ccalling_tasks = task;
2137 recoverSuspendedTask (Capability *cap, Task *task)
2140 task->prev->next = task->next;
2142 ASSERT(cap->suspended_ccalling_tasks == task);
2143 cap->suspended_ccalling_tasks = task->next;
2146 task->next->prev = task->prev;
2148 task->next = task->prev = NULL;
2151 /* ---------------------------------------------------------------------------
2152 * Suspending & resuming Haskell threads.
2154 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2155 * its capability before calling the C function. This allows another
2156 * task to pick up the capability and carry on running Haskell
2157 * threads. It also means that if the C call blocks, it won't lock
2160 * The Haskell thread making the C call is put to sleep for the
2161 * duration of the call, on the susepended_ccalling_threads queue. We
2162 * give out a token to the task, which it can use to resume the thread
2163 * on return from the C function.
2164 * ------------------------------------------------------------------------- */
2167 suspendThread (StgRegTable *reg)
2170 int saved_errno = errno;
2174 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2176 cap = regTableToCapability(reg);
2178 task = cap->running_task;
2179 tso = cap->r.rCurrentTSO;
2182 sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2184 // XXX this might not be necessary --SDM
2185 tso->what_next = ThreadRunGHC;
2187 threadPaused(cap,tso);
2189 if(tso->blocked_exceptions == NULL) {
2190 tso->why_blocked = BlockedOnCCall;
2191 tso->blocked_exceptions = END_TSO_QUEUE;
2193 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2196 // Hand back capability
2197 task->suspended_tso = tso;
2199 ACQUIRE_LOCK(&cap->lock);
2201 suspendTask(cap,task);
2202 cap->in_haskell = rtsFalse;
2203 releaseCapability_(cap);
2205 RELEASE_LOCK(&cap->lock);
2207 #if defined(THREADED_RTS)
2208 /* Preparing to leave the RTS, so ensure there's a native thread/task
2209 waiting to take over.
2211 IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2214 errno = saved_errno;
2219 resumeThread (void *task_)
2223 int saved_errno = errno;
2227 // Wait for permission to re-enter the RTS with the result.
2228 waitForReturnCapability(&cap,task);
2229 // we might be on a different capability now... but if so, our
2230 // entry on the suspended_ccalling_tasks list will also have been
2233 // Remove the thread from the suspended list
2234 recoverSuspendedTask(cap,task);
2236 tso = task->suspended_tso;
2237 task->suspended_tso = NULL;
2238 tso->link = END_TSO_QUEUE;
2239 IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2241 if (tso->why_blocked == BlockedOnCCall) {
2242 awakenBlockedQueue(cap,tso->blocked_exceptions);
2243 tso->blocked_exceptions = NULL;
2246 /* Reset blocking status */
2247 tso->why_blocked = NotBlocked;
2249 cap->r.rCurrentTSO = tso;
2250 cap->in_haskell = rtsTrue;
2251 errno = saved_errno;
2253 /* We might have GC'd, mark the TSO dirty again */
2259 /* ---------------------------------------------------------------------------
2260 * Comparing Thread ids.
2262 * This is used from STG land in the implementation of the
2263 * instances of Eq/Ord for ThreadIds.
2264 * ------------------------------------------------------------------------ */
2267 cmp_thread(StgPtr tso1, StgPtr tso2)
2269 StgThreadID id1 = ((StgTSO *)tso1)->id;
2270 StgThreadID id2 = ((StgTSO *)tso2)->id;
2272 if (id1 < id2) return (-1);
2273 if (id1 > id2) return 1;
2277 /* ---------------------------------------------------------------------------
2278 * Fetching the ThreadID from an StgTSO.
2280 * This is used in the implementation of Show for ThreadIds.
2281 * ------------------------------------------------------------------------ */
2283 rts_getThreadId(StgPtr tso)
2285 return ((StgTSO *)tso)->id;
2290 labelThread(StgPtr tso, char *label)
2295 /* Caveat: Once set, you can only set the thread name to "" */
2296 len = strlen(label)+1;
2297 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2298 strncpy(buf,label,len);
2299 /* Update will free the old memory for us */
2300 updateThreadLabel(((StgTSO *)tso)->id,buf);
2304 /* ---------------------------------------------------------------------------
2305 Create a new thread.
2307 The new thread starts with the given stack size. Before the
2308 scheduler can run, however, this thread needs to have a closure
2309 (and possibly some arguments) pushed on its stack. See
2310 pushClosure() in Schedule.h.
2312 createGenThread() and createIOThread() (in SchedAPI.h) are
2313 convenient packaged versions of this function.
2315 currently pri (priority) is only used in a GRAN setup -- HWL
2316 ------------------------------------------------------------------------ */
2318 /* currently pri (priority) is only used in a GRAN setup -- HWL */
2320 createThread(nat size, StgInt pri)
2323 createThread(Capability *cap, nat size)
2329 /* sched_mutex is *not* required */
2331 /* First check whether we should create a thread at all */
2332 #if defined(PARALLEL_HASKELL)
2333 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2334 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2336 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2337 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2338 return END_TSO_QUEUE;
2344 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2347 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2349 /* catch ridiculously small stack sizes */
2350 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2351 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2354 stack_size = size - TSO_STRUCT_SIZEW;
2356 tso = (StgTSO *)allocateLocal(cap, size);
2357 TICK_ALLOC_TSO(stack_size, 0);
2359 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2361 SET_GRAN_HDR(tso, ThisPE);
2364 // Always start with the compiled code evaluator
2365 tso->what_next = ThreadRunGHC;
2367 tso->why_blocked = NotBlocked;
2368 tso->blocked_exceptions = NULL;
2369 tso->flags = TSO_DIRTY;
2371 tso->saved_errno = 0;
2374 tso->stack_size = stack_size;
2375 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
2377 tso->sp = (P_)&(tso->stack) + stack_size;
2379 tso->trec = NO_TREC;
2382 tso->prof.CCCS = CCS_MAIN;
2385 /* put a stop frame on the stack */
2386 tso->sp -= sizeofW(StgStopFrame);
2387 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2388 tso->link = END_TSO_QUEUE;
2392 /* uses more flexible routine in GranSim */
2393 insertThread(tso, CurrentProc);
2395 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2401 if (RtsFlags.GranFlags.GranSimStats.Full)
2402 DumpGranEvent(GR_START,tso);
2403 #elif defined(PARALLEL_HASKELL)
2404 if (RtsFlags.ParFlags.ParStats.Full)
2405 DumpGranEvent(GR_STARTQ,tso);
2406 /* HACk to avoid SCHEDULE
2410 /* Link the new thread on the global thread list.
2412 ACQUIRE_LOCK(&sched_mutex);
2413 tso->id = next_thread_id++; // while we have the mutex
2414 tso->global_link = all_threads;
2416 RELEASE_LOCK(&sched_mutex);
2419 tso->dist.priority = MandatoryPriority; //by default that is...
2423 tso->gran.pri = pri;
2425 tso->gran.magic = TSO_MAGIC; // debugging only
2427 tso->gran.sparkname = 0;
2428 tso->gran.startedat = CURRENT_TIME;
2429 tso->gran.exported = 0;
2430 tso->gran.basicblocks = 0;
2431 tso->gran.allocs = 0;
2432 tso->gran.exectime = 0;
2433 tso->gran.fetchtime = 0;
2434 tso->gran.fetchcount = 0;
2435 tso->gran.blocktime = 0;
2436 tso->gran.blockcount = 0;
2437 tso->gran.blockedat = 0;
2438 tso->gran.globalsparks = 0;
2439 tso->gran.localsparks = 0;
2440 if (RtsFlags.GranFlags.Light)
2441 tso->gran.clock = Now; /* local clock */
2443 tso->gran.clock = 0;
2445 IF_DEBUG(gran,printTSO(tso));
2446 #elif defined(PARALLEL_HASKELL)
2448 tso->par.magic = TSO_MAGIC; // debugging only
2450 tso->par.sparkname = 0;
2451 tso->par.startedat = CURRENT_TIME;
2452 tso->par.exported = 0;
2453 tso->par.basicblocks = 0;
2454 tso->par.allocs = 0;
2455 tso->par.exectime = 0;
2456 tso->par.fetchtime = 0;
2457 tso->par.fetchcount = 0;
2458 tso->par.blocktime = 0;
2459 tso->par.blockcount = 0;
2460 tso->par.blockedat = 0;
2461 tso->par.globalsparks = 0;
2462 tso->par.localsparks = 0;
2466 globalGranStats.tot_threads_created++;
2467 globalGranStats.threads_created_on_PE[CurrentProc]++;
2468 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2469 globalGranStats.tot_sq_probes++;
2470 #elif defined(PARALLEL_HASKELL)
2471 // collect parallel global statistics (currently done together with GC stats)
2472 if (RtsFlags.ParFlags.ParStats.Global &&
2473 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2474 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
2475 globalParStats.tot_threads_created++;
2481 sched_belch("==__ schedule: Created TSO %d (%p);",
2482 CurrentProc, tso, tso->id));
2483 #elif defined(PARALLEL_HASKELL)
2484 IF_PAR_DEBUG(verbose,
2485 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2486 (long)tso->id, tso, advisory_thread_count));
2488 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
2489 (long)tso->id, (long)tso->stack_size));
2496 all parallel thread creation calls should fall through the following routine.
2499 createThreadFromSpark(rtsSpark spark)
2501 ASSERT(spark != (rtsSpark)NULL);
2502 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2503 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
2505 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2506 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2507 return END_TSO_QUEUE;
2511 tso = createThread(RtsFlags.GcFlags.initialStkSize);
2512 if (tso==END_TSO_QUEUE)
2513 barf("createSparkThread: Cannot create TSO");
2515 tso->priority = AdvisoryPriority;
2517 pushClosure(tso,spark);
2519 advisory_thread_count++; // JB: TAKE CARE OF THIS COUNTER! BUGGY
2526 Turn a spark into a thread.
2527 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2531 activateSpark (rtsSpark spark)
2535 tso = createSparkThread(spark);
2536 if (RtsFlags.ParFlags.ParStats.Full) {
2537 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2538 IF_PAR_DEBUG(verbose,
2539 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2540 (StgClosure *)spark, info_type((StgClosure *)spark)));
2542 // ToDo: fwd info on local/global spark to thread -- HWL
2543 // tso->gran.exported = spark->exported;
2544 // tso->gran.locked = !spark->global;
2545 // tso->gran.sparkname = spark->name;
2551 /* ---------------------------------------------------------------------------
2554 * scheduleThread puts a thread on the end of the runnable queue.
2555 * This will usually be done immediately after a thread is created.
2556 * The caller of scheduleThread must create the thread using e.g.
2557 * createThread and push an appropriate closure
2558 * on this thread's stack before the scheduler is invoked.
2559 * ------------------------------------------------------------------------ */
2562 scheduleThread(Capability *cap, StgTSO *tso)
2564 // The thread goes at the *end* of the run-queue, to avoid possible
2565 // starvation of any threads already on the queue.
2566 appendToRunQueue(cap,tso);
2570 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2574 // We already created/initialised the Task
2575 task = cap->running_task;
2577 // This TSO is now a bound thread; make the Task and TSO
2578 // point to each other.
2583 task->stat = NoStatus;
2585 appendToRunQueue(cap,tso);
2587 IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2590 /* GranSim specific init */
2591 CurrentTSO = m->tso; // the TSO to run
2592 procStatus[MainProc] = Busy; // status of main PE
2593 CurrentProc = MainProc; // PE to run it on
2596 cap = schedule(cap,task);
2598 ASSERT(task->stat != NoStatus);
2599 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2601 IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2605 /* ----------------------------------------------------------------------------
2607 * ------------------------------------------------------------------------- */
2609 #if defined(THREADED_RTS)
2611 workerStart(Task *task)
2615 // See startWorkerTask().
2616 ACQUIRE_LOCK(&task->lock);
2618 RELEASE_LOCK(&task->lock);
2620 // set the thread-local pointer to the Task:
2623 // schedule() runs without a lock.
2624 cap = schedule(cap,task);
2626 // On exit from schedule(), we have a Capability.
2627 releaseCapability(cap);
2632 /* ---------------------------------------------------------------------------
2635 * Initialise the scheduler. This resets all the queues - if the
2636 * queues contained any threads, they'll be garbage collected at the
2639 * ------------------------------------------------------------------------ */
2646 for (i=0; i<=MAX_PROC; i++) {
2647 run_queue_hds[i] = END_TSO_QUEUE;
2648 run_queue_tls[i] = END_TSO_QUEUE;
2649 blocked_queue_hds[i] = END_TSO_QUEUE;
2650 blocked_queue_tls[i] = END_TSO_QUEUE;
2651 ccalling_threadss[i] = END_TSO_QUEUE;
2652 blackhole_queue[i] = END_TSO_QUEUE;
2653 sleeping_queue = END_TSO_QUEUE;
2655 #elif !defined(THREADED_RTS)
2656 blocked_queue_hd = END_TSO_QUEUE;
2657 blocked_queue_tl = END_TSO_QUEUE;
2658 sleeping_queue = END_TSO_QUEUE;
2661 blackhole_queue = END_TSO_QUEUE;
2662 all_threads = END_TSO_QUEUE;
2667 RtsFlags.ConcFlags.ctxtSwitchTicks =
2668 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2670 #if defined(THREADED_RTS)
2671 /* Initialise the mutex and condition variables used by
2673 initMutex(&sched_mutex);
2676 ACQUIRE_LOCK(&sched_mutex);
2678 /* A capability holds the state a native thread needs in
2679 * order to execute STG code. At least one capability is
2680 * floating around (only SMP builds have more than one).
2686 #if defined(SMP) || defined(PARALLEL_HASKELL)
2692 * Eagerly start one worker to run each Capability, except for
2693 * Capability 0. The idea is that we're probably going to start a
2694 * bound thread on Capability 0 pretty soon, so we don't want a
2695 * worker task hogging it.
2700 for (i = 1; i < n_capabilities; i++) {
2701 cap = &capabilities[i];
2702 ACQUIRE_LOCK(&cap->lock);
2703 startWorkerTask(cap, workerStart);
2704 RELEASE_LOCK(&cap->lock);
2709 RELEASE_LOCK(&sched_mutex);
2713 exitScheduler( void )
2715 interrupted = rtsTrue;
2716 shutting_down_scheduler = rtsTrue;
2718 #if defined(THREADED_RTS)
2723 ACQUIRE_LOCK(&sched_mutex);
2724 task = newBoundTask();
2725 RELEASE_LOCK(&sched_mutex);
2727 for (i = 0; i < n_capabilities; i++) {
2728 shutdownCapability(&capabilities[i], task);
2730 boundTaskExiting(task);
2736 /* ---------------------------------------------------------------------------
2737 Where are the roots that we know about?
2739 - all the threads on the runnable queue
2740 - all the threads on the blocked queue
2741 - all the threads on the sleeping queue
2742 - all the thread currently executing a _ccall_GC
2743 - all the "main threads"
2745 ------------------------------------------------------------------------ */
2747 /* This has to be protected either by the scheduler monitor, or by the
2748 garbage collection monitor (probably the latter).
2753 GetRoots( evac_fn evac )
2760 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2761 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2762 evac((StgClosure **)&run_queue_hds[i]);
2763 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2764 evac((StgClosure **)&run_queue_tls[i]);
2766 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2767 evac((StgClosure **)&blocked_queue_hds[i]);
2768 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2769 evac((StgClosure **)&blocked_queue_tls[i]);
2770 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2771 evac((StgClosure **)&ccalling_threads[i]);
2778 for (i = 0; i < n_capabilities; i++) {
2779 cap = &capabilities[i];
2780 evac((StgClosure **)&cap->run_queue_hd);
2781 evac((StgClosure **)&cap->run_queue_tl);
2783 for (task = cap->suspended_ccalling_tasks; task != NULL;
2785 evac((StgClosure **)&task->suspended_tso);
2789 #if !defined(THREADED_RTS)
2790 evac((StgClosure **)&blocked_queue_hd);
2791 evac((StgClosure **)&blocked_queue_tl);
2792 evac((StgClosure **)&sleeping_queue);
2796 evac((StgClosure **)&blackhole_queue);
2798 #if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN)
2799 markSparkQueue(evac);
2802 #if defined(RTS_USER_SIGNALS)
2803 // mark the signal handlers (signals should be already blocked)
2804 markSignalHandlers(evac);
2808 /* -----------------------------------------------------------------------------
2811 This is the interface to the garbage collector from Haskell land.
2812 We provide this so that external C code can allocate and garbage
2813 collect when called from Haskell via _ccall_GC.
2815 It might be useful to provide an interface whereby the programmer
2816 can specify more roots (ToDo).
2818 This needs to be protected by the GC condition variable above. KH.
2819 -------------------------------------------------------------------------- */
2821 static void (*extra_roots)(evac_fn);
2827 // ToDo: we have to grab all the capabilities here.
2828 errorBelch("performGC not supported in threaded RTS (yet)");
2829 stg_exit(EXIT_FAILURE);
2831 /* Obligated to hold this lock upon entry */
2832 GarbageCollect(GetRoots,rtsFalse);
2836 performMajorGC(void)
2839 errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2840 stg_exit(EXIT_FAILURE);
2842 GarbageCollect(GetRoots,rtsTrue);
2846 AllRoots(evac_fn evac)
2848 GetRoots(evac); // the scheduler's roots
2849 extra_roots(evac); // the user's roots
2853 performGCWithRoots(void (*get_roots)(evac_fn))
2856 errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2857 stg_exit(EXIT_FAILURE);
2859 extra_roots = get_roots;
2860 GarbageCollect(AllRoots,rtsFalse);
2863 /* -----------------------------------------------------------------------------
2866 If the thread has reached its maximum stack size, then raise the
2867 StackOverflow exception in the offending thread. Otherwise
2868 relocate the TSO into a larger chunk of memory and adjust its stack
2870 -------------------------------------------------------------------------- */
2873 threadStackOverflow(Capability *cap, StgTSO *tso)
2875 nat new_stack_size, stack_words;
2880 IF_DEBUG(sanity,checkTSO(tso));
2881 if (tso->stack_size >= tso->max_stack_size) {
2884 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2885 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2886 /* If we're debugging, just print out the top of the stack */
2887 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2890 /* Send this thread the StackOverflow exception */
2891 raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2895 /* Try to double the current stack size. If that takes us over the
2896 * maximum stack size for this thread, then use the maximum instead.
2897 * Finally round up so the TSO ends up as a whole number of blocks.
2899 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2900 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2901 TSO_STRUCT_SIZE)/sizeof(W_);
2902 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2903 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2905 IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2907 dest = (StgTSO *)allocate(new_tso_size);
2908 TICK_ALLOC_TSO(new_stack_size,0);
2910 /* copy the TSO block and the old stack into the new area */
2911 memcpy(dest,tso,TSO_STRUCT_SIZE);
2912 stack_words = tso->stack + tso->stack_size - tso->sp;
2913 new_sp = (P_)dest + new_tso_size - stack_words;
2914 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2916 /* relocate the stack pointers... */
2918 dest->stack_size = new_stack_size;
2920 /* Mark the old TSO as relocated. We have to check for relocated
2921 * TSOs in the garbage collector and any primops that deal with TSOs.
2923 * It's important to set the sp value to just beyond the end
2924 * of the stack, so we don't attempt to scavenge any part of the
2927 tso->what_next = ThreadRelocated;
2929 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2930 tso->why_blocked = NotBlocked;
2932 IF_PAR_DEBUG(verbose,
2933 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2934 tso->id, tso, tso->stack_size);
2935 /* If we're debugging, just print out the top of the stack */
2936 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2939 IF_DEBUG(sanity,checkTSO(tso));
2941 IF_DEBUG(scheduler,printTSO(dest));
2947 /* ---------------------------------------------------------------------------
2948 Wake up a queue that was blocked on some resource.
2949 ------------------------------------------------------------------------ */
2953 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2956 #elif defined(PARALLEL_HASKELL)
2958 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2960 /* write RESUME events to log file and
2961 update blocked and fetch time (depending on type of the orig closure) */
2962 if (RtsFlags.ParFlags.ParStats.Full) {
2963 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2964 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2965 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2966 if (emptyRunQueue())
2967 emitSchedule = rtsTrue;
2969 switch (get_itbl(node)->type) {
2971 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2976 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2983 barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2990 StgBlockingQueueElement *
2991 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2994 PEs node_loc, tso_loc;
2996 node_loc = where_is(node); // should be lifted out of loop
2997 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2998 tso_loc = where_is((StgClosure *)tso);
2999 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3000 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3001 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3002 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3003 // insertThread(tso, node_loc);
3004 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3006 tso, node, (rtsSpark*)NULL);
3007 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3010 } else { // TSO is remote (actually should be FMBQ)
3011 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3012 RtsFlags.GranFlags.Costs.gunblocktime +
3013 RtsFlags.GranFlags.Costs.latency;
3014 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3016 tso, node, (rtsSpark*)NULL);
3017 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
3020 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3022 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3023 (node_loc==tso_loc ? "Local" : "Global"),
3024 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3025 tso->block_info.closure = NULL;
3026 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
3029 #elif defined(PARALLEL_HASKELL)
3030 StgBlockingQueueElement *
3031 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3033 StgBlockingQueueElement *next;
3035 switch (get_itbl(bqe)->type) {
3037 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3038 /* if it's a TSO just push it onto the run_queue */
3040 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3041 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
3043 unblockCount(bqe, node);
3044 /* reset blocking status after dumping event */
3045 ((StgTSO *)bqe)->why_blocked = NotBlocked;
3049 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3051 bqe->link = (StgBlockingQueueElement *)PendingFetches;
3052 PendingFetches = (StgBlockedFetch *)bqe;
3056 /* can ignore this case in a non-debugging setup;
3057 see comments on RBHSave closures above */
3059 /* check that the closure is an RBHSave closure */
3060 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3061 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3062 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3066 barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3067 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
3071 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3077 unblockOne(Capability *cap, StgTSO *tso)
3081 ASSERT(get_itbl(tso)->type == TSO);
3082 ASSERT(tso->why_blocked != NotBlocked);
3083 tso->why_blocked = NotBlocked;
3085 tso->link = END_TSO_QUEUE;
3087 // We might have just migrated this TSO to our Capability:
3089 tso->bound->cap = cap;
3092 appendToRunQueue(cap,tso);
3094 // we're holding a newly woken thread, make sure we context switch
3095 // quickly so we can migrate it if necessary.
3097 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3104 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3106 StgBlockingQueueElement *bqe;
3111 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3112 node, CurrentProc, CurrentTime[CurrentProc],
3113 CurrentTSO->id, CurrentTSO));
3115 node_loc = where_is(node);
3117 ASSERT(q == END_BQ_QUEUE ||
3118 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
3119 get_itbl(q)->type == CONSTR); // closure (type constructor)
3120 ASSERT(is_unique(node));
3122 /* FAKE FETCH: magically copy the node to the tso's proc;
3123 no Fetch necessary because in reality the node should not have been
3124 moved to the other PE in the first place
3126 if (CurrentProc!=node_loc) {
3128 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3129 node, node_loc, CurrentProc, CurrentTSO->id,
3130 // CurrentTSO, where_is(CurrentTSO),
3131 node->header.gran.procs));
3132 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3134 debugBelch("## new bitmask of node %p is %#x\n",
3135 node, node->header.gran.procs));
3136 if (RtsFlags.GranFlags.GranSimStats.Global) {
3137 globalGranStats.tot_fake_fetches++;
3142 // ToDo: check: ASSERT(CurrentProc==node_loc);
3143 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3146 bqe points to the current element in the queue
3147 next points to the next element in the queue
3149 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3150 //tso_loc = where_is(tso);
3152 bqe = unblockOne(bqe, node);
3155 /* if this is the BQ of an RBH, we have to put back the info ripped out of
3156 the closure to make room for the anchor of the BQ */
3157 if (bqe!=END_BQ_QUEUE) {
3158 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3160 ASSERT((info_ptr==&RBH_Save_0_info) ||
3161 (info_ptr==&RBH_Save_1_info) ||
3162 (info_ptr==&RBH_Save_2_info));
3164 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3165 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3166 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3169 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3170 node, info_type(node)));
3173 /* statistics gathering */
3174 if (RtsFlags.GranFlags.GranSimStats.Global) {
3175 // globalGranStats.tot_bq_processing_time += bq_processing_time;
3176 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
3177 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
3178 globalGranStats.tot_awbq++; // total no. of bqs awakened
3181 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3182 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3184 #elif defined(PARALLEL_HASKELL)
3186 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3188 StgBlockingQueueElement *bqe;
3190 IF_PAR_DEBUG(verbose,
3191 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3195 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3196 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3201 ASSERT(q == END_BQ_QUEUE ||
3202 get_itbl(q)->type == TSO ||
3203 get_itbl(q)->type == BLOCKED_FETCH ||
3204 get_itbl(q)->type == CONSTR);
3207 while (get_itbl(bqe)->type==TSO ||
3208 get_itbl(bqe)->type==BLOCKED_FETCH) {
3209 bqe = unblockOne(bqe, node);
3213 #else /* !GRAN && !PARALLEL_HASKELL */
3216 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3218 if (tso == NULL) return; // hack; see bug #1235728, and comments in
3220 while (tso != END_TSO_QUEUE) {
3221 tso = unblockOne(cap,tso);
3226 /* ---------------------------------------------------------------------------
3228 - usually called inside a signal handler so it mustn't do anything fancy.
3229 ------------------------------------------------------------------------ */
3232 interruptStgRts(void)
3236 #if defined(THREADED_RTS)
3237 prodAllCapabilities();
3241 /* -----------------------------------------------------------------------------
3244 This is for use when we raise an exception in another thread, which
3246 This has nothing to do with the UnblockThread event in GranSim. -- HWL
3247 -------------------------------------------------------------------------- */
3249 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3251 NB: only the type of the blocking queue is different in GranSim and GUM
3252 the operations on the queue-elements are the same
3253 long live polymorphism!
3255 Locks: sched_mutex is held upon entry and exit.
3259 unblockThread(Capability *cap, StgTSO *tso)
3261 StgBlockingQueueElement *t, **last;
3263 switch (tso->why_blocked) {
3266 return; /* not blocked */
3269 // Be careful: nothing to do here! We tell the scheduler that the thread
3270 // is runnable and we leave it to the stack-walking code to abort the
3271 // transaction while unwinding the stack. We should perhaps have a debugging
3272 // test to make sure that this really happens and that the 'zombie' transaction
3273 // does not get committed.
3277 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3279 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3280 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3282 last = (StgBlockingQueueElement **)&mvar->head;
3283 for (t = (StgBlockingQueueElement *)mvar->head;
3285 last = &t->link, last_tso = t, t = t->link) {
3286 if (t == (StgBlockingQueueElement *)tso) {
3287 *last = (StgBlockingQueueElement *)tso->link;
3288 if (mvar->tail == tso) {
3289 mvar->tail = (StgTSO *)last_tso;
3294 barf("unblockThread (MVAR): TSO not found");
3297 case BlockedOnBlackHole:
3298 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3300 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3302 last = &bq->blocking_queue;
3303 for (t = bq->blocking_queue;
3305 last = &t->link, t = t->link) {
3306 if (t == (StgBlockingQueueElement *)tso) {
3307 *last = (StgBlockingQueueElement *)tso->link;
3311 barf("unblockThread (BLACKHOLE): TSO not found");
3314 case BlockedOnException:
3316 StgTSO *target = tso->block_info.tso;
3318 ASSERT(get_itbl(target)->type == TSO);
3320 if (target->what_next == ThreadRelocated) {
3321 target = target->link;
3322 ASSERT(get_itbl(target)->type == TSO);
3325 ASSERT(target->blocked_exceptions != NULL);
3327 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3328 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
3330 last = &t->link, t = t->link) {
3331 ASSERT(get_itbl(t)->type == TSO);
3332 if (t == (StgBlockingQueueElement *)tso) {
3333 *last = (StgBlockingQueueElement *)tso->link;
3337 barf("unblockThread (Exception): TSO not found");
3341 case BlockedOnWrite:
3342 #if defined(mingw32_HOST_OS)
3343 case BlockedOnDoProc:
3346 /* take TSO off blocked_queue */
3347 StgBlockingQueueElement *prev = NULL;
3348 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
3349 prev = t, t = t->link) {
3350 if (t == (StgBlockingQueueElement *)tso) {
3352 blocked_queue_hd = (StgTSO *)t->link;
3353 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3354 blocked_queue_tl = END_TSO_QUEUE;
3357 prev->link = t->link;
3358 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3359 blocked_queue_tl = (StgTSO *)prev;
3362 #if defined(mingw32_HOST_OS)
3363 /* (Cooperatively) signal that the worker thread should abort
3366 abandonWorkRequest(tso->block_info.async_result->reqID);
3371 barf("unblockThread (I/O): TSO not found");
3374 case BlockedOnDelay:
3376 /* take TSO off sleeping_queue */
3377 StgBlockingQueueElement *prev = NULL;
3378 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
3379 prev = t, t = t->link) {
3380 if (t == (StgBlockingQueueElement *)tso) {
3382 sleeping_queue = (StgTSO *)t->link;
3384 prev->link = t->link;
3389 barf("unblockThread (delay): TSO not found");
3393 barf("unblockThread");
3397 tso->link = END_TSO_QUEUE;
3398 tso->why_blocked = NotBlocked;
3399 tso->block_info.closure = NULL;
3400 pushOnRunQueue(cap,tso);
3404 unblockThread(Capability *cap, StgTSO *tso)
3408 /* To avoid locking unnecessarily. */
3409 if (tso->why_blocked == NotBlocked) {
3413 switch (tso->why_blocked) {
3416 // Be careful: nothing to do here! We tell the scheduler that the thread
3417 // is runnable and we leave it to the stack-walking code to abort the
3418 // transaction while unwinding the stack. We should perhaps have a debugging
3419 // test to make sure that this really happens and that the 'zombie' transaction
3420 // does not get committed.
3424 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3426 StgTSO *last_tso = END_TSO_QUEUE;
3427 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3430 for (t = mvar->head; t != END_TSO_QUEUE;
3431 last = &t->link, last_tso = t, t = t->link) {
3434 if (mvar->tail == tso) {
3435 mvar->tail = last_tso;
3440 barf("unblockThread (MVAR): TSO not found");
3443 case BlockedOnBlackHole:
3445 last = &blackhole_queue;
3446 for (t = blackhole_queue; t != END_TSO_QUEUE;
3447 last = &t->link, t = t->link) {
3453 barf("unblockThread (BLACKHOLE): TSO not found");
3456 case BlockedOnException:
3458 StgTSO *target = tso->block_info.tso;
3460 ASSERT(get_itbl(target)->type == TSO);
3462 while (target->what_next == ThreadRelocated) {
3463 target = target->link;
3464 ASSERT(get_itbl(target)->type == TSO);
3467 ASSERT(target->blocked_exceptions != NULL);
3469 last = &target->blocked_exceptions;
3470 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
3471 last = &t->link, t = t->link) {
3472 ASSERT(get_itbl(t)->type == TSO);
3478 barf("unblockThread (Exception): TSO not found");
3481 #if !defined(THREADED_RTS)
3483 case BlockedOnWrite:
3484 #if defined(mingw32_HOST_OS)
3485 case BlockedOnDoProc:
3488 StgTSO *prev = NULL;
3489 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
3490 prev = t, t = t->link) {
3493 blocked_queue_hd = t->link;
3494 if (blocked_queue_tl == t) {
3495 blocked_queue_tl = END_TSO_QUEUE;
3498 prev->link = t->link;
3499 if (blocked_queue_tl == t) {
3500 blocked_queue_tl = prev;
3503 #if defined(mingw32_HOST_OS)
3504 /* (Cooperatively) signal that the worker thread should abort
3507 abandonWorkRequest(tso->block_info.async_result->reqID);
3512 barf("unblockThread (I/O): TSO not found");
3515 case BlockedOnDelay:
3517 StgTSO *prev = NULL;
3518 for (t = sleeping_queue; t != END_TSO_QUEUE;
3519 prev = t, t = t->link) {
3522 sleeping_queue = t->link;
3524 prev->link = t->link;
3529 barf("unblockThread (delay): TSO not found");
3534 barf("unblockThread");
3538 tso->link = END_TSO_QUEUE;
3539 tso->why_blocked = NotBlocked;
3540 tso->block_info.closure = NULL;
3541 appendToRunQueue(cap,tso);
3545 /* -----------------------------------------------------------------------------
3548 * Check the blackhole_queue for threads that can be woken up. We do
3549 * this periodically: before every GC, and whenever the run queue is
3552 * An elegant solution might be to just wake up all the blocked
3553 * threads with awakenBlockedQueue occasionally: they'll go back to
3554 * sleep again if the object is still a BLACKHOLE. Unfortunately this
3555 * doesn't give us a way to tell whether we've actually managed to
3556 * wake up any threads, so we would be busy-waiting.
3558 * -------------------------------------------------------------------------- */
3561 checkBlackHoles (Capability *cap)
3564 rtsBool any_woke_up = rtsFalse;
3567 // blackhole_queue is global:
3568 ASSERT_LOCK_HELD(&sched_mutex);
3570 IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3572 // ASSUMES: sched_mutex
3573 prev = &blackhole_queue;
3574 t = blackhole_queue;
3575 while (t != END_TSO_QUEUE) {
3576 ASSERT(t->why_blocked == BlockedOnBlackHole);
3577 type = get_itbl(t->block_info.closure)->type;
3578 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3579 IF_DEBUG(sanity,checkTSO(t));
3580 t = unblockOne(cap, t);
3581 // urk, the threads migrate to the current capability
3582 // here, but we'd like to keep them on the original one.
3584 any_woke_up = rtsTrue;
3594 /* -----------------------------------------------------------------------------
3597 * The following function implements the magic for raising an
3598 * asynchronous exception in an existing thread.
3600 * We first remove the thread from any queue on which it might be
3601 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3603 * We strip the stack down to the innermost CATCH_FRAME, building
3604 * thunks in the heap for all the active computations, so they can
3605 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3606 * an application of the handler to the exception, and push it on
3607 * the top of the stack.
3609 * How exactly do we save all the active computations? We create an
3610 * AP_STACK for every UpdateFrame on the stack. Entering one of these
3611 * AP_STACKs pushes everything from the corresponding update frame
3612 * upwards onto the stack. (Actually, it pushes everything up to the
3613 * next update frame plus a pointer to the next AP_STACK object.
3614 * Entering the next AP_STACK object pushes more onto the stack until we
3615 * reach the last AP_STACK object - at which point the stack should look
3616 * exactly as it did when we killed the TSO and we can continue
3617 * execution by entering the closure on top of the stack.
3619 * We can also kill a thread entirely - this happens if either (a) the
3620 * exception passed to raiseAsync is NULL, or (b) there's no
3621 * CATCH_FRAME on the stack. In either case, we strip the entire
3622 * stack and replace the thread with a zombie.
3624 * ToDo: in SMP mode, this function is only safe if either (a) we hold
3625 * all the Capabilities (eg. in GC), or (b) we own the Capability that
3626 * the TSO is currently blocked on or on the run queue of.
3628 * -------------------------------------------------------------------------- */
3631 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3633 raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3637 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3639 raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3643 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
3644 rtsBool stop_at_atomically, StgPtr stop_here)
3646 StgRetInfoTable *info;
3650 // Thread already dead?
3651 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3656 sched_belch("raising exception in thread %ld.", (long)tso->id));
3658 // Remove it from any blocking queues
3659 unblockThread(cap,tso);
3661 // mark it dirty; we're about to change its stack.
3666 // The stack freezing code assumes there's a closure pointer on
3667 // the top of the stack, so we have to arrange that this is the case...
3669 if (sp[0] == (W_)&stg_enter_info) {
3673 sp[0] = (W_)&stg_dummy_ret_closure;
3677 while (stop_here == NULL || frame < stop_here) {
3679 // 1. Let the top of the stack be the "current closure"
3681 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3684 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3685 // current closure applied to the chunk of stack up to (but not
3686 // including) the update frame. This closure becomes the "current
3687 // closure". Go back to step 2.
3689 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3690 // top of the stack applied to the exception.
3692 // 5. If it's a STOP_FRAME, then kill the thread.
3694 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
3697 info = get_ret_itbl((StgClosure *)frame);
3699 switch (info->i.type) {
3706 // First build an AP_STACK consisting of the stack chunk above the
3707 // current update frame, with the top word on the stack as the
3710 words = frame - sp - 1;
3711 ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3714 ap->fun = (StgClosure *)sp[0];
3716 for(i=0; i < (nat)words; ++i) {
3717 ap->payload[i] = (StgClosure *)*sp++;
3720 SET_HDR(ap,&stg_AP_STACK_info,
3721 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3722 TICK_ALLOC_UP_THK(words+1,0);
3725 debugBelch("sched: Updating ");
3726 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3727 debugBelch(" with ");
3728 printObj((StgClosure *)ap);
3731 // Replace the updatee with an indirection
3733 // Warning: if we're in a loop, more than one update frame on
3734 // the stack may point to the same object. Be careful not to
3735 // overwrite an IND_OLDGEN in this case, because we'll screw
3736 // up the mutable lists. To be on the safe side, don't
3737 // overwrite any kind of indirection at all. See also
3738 // threadSqueezeStack in GC.c, where we have to make a similar
3741 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3742 // revert the black hole
3743 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3746 sp += sizeofW(StgUpdateFrame) - 1;
3747 sp[0] = (W_)ap; // push onto stack
3749 continue; //no need to bump frame
3753 // We've stripped the entire stack, the thread is now dead.
3754 tso->what_next = ThreadKilled;
3755 tso->sp = frame + sizeofW(StgStopFrame);
3759 // If we find a CATCH_FRAME, and we've got an exception to raise,
3760 // then build the THUNK raise(exception), and leave it on
3761 // top of the CATCH_FRAME ready to enter.
3765 StgCatchFrame *cf = (StgCatchFrame *)frame;
3769 if (exception == NULL) break;
3771 // we've got an exception to raise, so let's pass it to the
3772 // handler in this frame.
3774 raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3775 TICK_ALLOC_SE_THK(1,0);
3776 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3777 raise->payload[0] = exception;
3779 // throw away the stack from Sp up to the CATCH_FRAME.
3783 /* Ensure that async excpetions are blocked now, so we don't get
3784 * a surprise exception before we get around to executing the
3787 if (tso->blocked_exceptions == NULL) {
3788 tso->blocked_exceptions = END_TSO_QUEUE;
3791 /* Put the newly-built THUNK on top of the stack, ready to execute
3792 * when the thread restarts.
3795 sp[-1] = (W_)&stg_enter_info;
3797 tso->what_next = ThreadRunGHC;
3798 IF_DEBUG(sanity, checkTSO(tso));
3802 case ATOMICALLY_FRAME:
3803 if (stop_at_atomically) {
3804 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3805 stmCondemnTransaction(cap, tso -> trec);
3809 // R1 is not a register: the return convention for IO in
3810 // this case puts the return value on the stack, so we
3811 // need to set up the stack to return to the atomically
3812 // frame properly...
3813 tso->sp = frame - 2;
3814 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3815 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3817 tso->what_next = ThreadRunGHC;
3820 // Not stop_at_atomically... fall through and abort the
3823 case CATCH_RETRY_FRAME:
3824 // IF we find an ATOMICALLY_FRAME then we abort the
3825 // current transaction and propagate the exception. In
3826 // this case (unlike ordinary exceptions) we do not care
3827 // whether the transaction is valid or not because its
3828 // possible validity cannot have caused the exception
3829 // and will not be visible after the abort.
3831 debugBelch("Found atomically block delivering async exception\n"));
3832 StgTRecHeader *trec = tso -> trec;
3833 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3834 stmAbortTransaction(cap, trec);
3835 tso -> trec = outer;
3842 // move on to the next stack frame
3843 frame += stack_frame_sizeW((StgClosure *)frame);
3846 // if we got here, then we stopped at stop_here
3847 ASSERT(stop_here != NULL);
3850 /* -----------------------------------------------------------------------------
3853 This is used for interruption (^C) and forking, and corresponds to
3854 raising an exception but without letting the thread catch the
3856 -------------------------------------------------------------------------- */
3859 deleteThread (Capability *cap, StgTSO *tso)
3861 if (tso->why_blocked != BlockedOnCCall &&
3862 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3863 raiseAsync(cap,tso,NULL);
3867 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3869 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3870 { // for forkProcess only:
3871 // delete thread without giving it a chance to catch the KillThread exception
3873 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3877 if (tso->why_blocked != BlockedOnCCall &&
3878 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3879 unblockThread(cap,tso);
3882 tso->what_next = ThreadKilled;
3886 /* -----------------------------------------------------------------------------
3887 raiseExceptionHelper
3889 This function is called by the raise# primitve, just so that we can
3890 move some of the tricky bits of raising an exception from C-- into
3891 C. Who knows, it might be a useful re-useable thing here too.
3892 -------------------------------------------------------------------------- */
3895 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3897 Capability *cap = regTableToCapability(reg);
3898 StgThunk *raise_closure = NULL;
3900 StgRetInfoTable *info;
3902 // This closure represents the expression 'raise# E' where E
3903 // is the exception raise. It is used to overwrite all the
3904 // thunks which are currently under evaluataion.
3908 // LDV profiling: stg_raise_info has THUNK as its closure
3909 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3910 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3911 // 1 does not cause any problem unless profiling is performed.
3912 // However, when LDV profiling goes on, we need to linearly scan
3913 // small object pool, where raise_closure is stored, so we should
3914 // use MIN_UPD_SIZE.
3916 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3917 // sizeofW(StgClosure)+1);
3921 // Walk up the stack, looking for the catch frame. On the way,
3922 // we update any closures pointed to from update frames with the
3923 // raise closure that we just built.
3927 info = get_ret_itbl((StgClosure *)p);
3928 next = p + stack_frame_sizeW((StgClosure *)p);
3929 switch (info->i.type) {
3932 // Only create raise_closure if we need to.
3933 if (raise_closure == NULL) {
3935 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3936 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3937 raise_closure->payload[0] = exception;
3939 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3943 case ATOMICALLY_FRAME:
3944 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3946 return ATOMICALLY_FRAME;
3952 case CATCH_STM_FRAME:
3953 IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3955 return CATCH_STM_FRAME;
3961 case CATCH_RETRY_FRAME:
3970 /* -----------------------------------------------------------------------------
3971 findRetryFrameHelper
3973 This function is called by the retry# primitive. It traverses the stack
3974 leaving tso->sp referring to the frame which should handle the retry.
3976 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3977 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3979 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3980 despite the similar implementation.
3982 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3983 not be created within memory transactions.
3984 -------------------------------------------------------------------------- */
3987 findRetryFrameHelper (StgTSO *tso)
3990 StgRetInfoTable *info;
3994 info = get_ret_itbl((StgClosure *)p);
3995 next = p + stack_frame_sizeW((StgClosure *)p);
3996 switch (info->i.type) {
3998 case ATOMICALLY_FRAME:
3999 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4001 return ATOMICALLY_FRAME;
4003 case CATCH_RETRY_FRAME:
4004 IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4006 return CATCH_RETRY_FRAME;
4008 case CATCH_STM_FRAME:
4010 ASSERT(info->i.type != CATCH_FRAME);
4011 ASSERT(info->i.type != STOP_FRAME);
4018 /* -----------------------------------------------------------------------------
4019 resurrectThreads is called after garbage collection on the list of
4020 threads found to be garbage. Each of these threads will be woken
4021 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4022 on an MVar, or NonTermination if the thread was blocked on a Black
4025 Locks: assumes we hold *all* the capabilities.
4026 -------------------------------------------------------------------------- */
4029 resurrectThreads (StgTSO *threads)
4034 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4035 next = tso->global_link;
4036 tso->global_link = all_threads;
4038 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4040 // Wake up the thread on the Capability it was last on for a
4041 // bound thread, or last_free_capability otherwise.
4043 cap = tso->bound->cap;
4045 cap = last_free_capability;
4048 switch (tso->why_blocked) {
4050 case BlockedOnException:
4051 /* Called by GC - sched_mutex lock is currently held. */
4052 raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4054 case BlockedOnBlackHole:
4055 raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4058 raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4061 /* This might happen if the thread was blocked on a black hole
4062 * belonging to a thread that we've just woken up (raiseAsync
4063 * can wake up threads, remember...).
4067 barf("resurrectThreads: thread blocked in a strange way");
4072 /* ----------------------------------------------------------------------------
4073 * Debugging: why is a thread blocked
4074 * [Also provides useful information when debugging threaded programs
4075 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4076 ------------------------------------------------------------------------- */
4080 printThreadBlockage(StgTSO *tso)
4082 switch (tso->why_blocked) {
4084 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4086 case BlockedOnWrite:
4087 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4089 #if defined(mingw32_HOST_OS)
4090 case BlockedOnDoProc:
4091 debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4094 case BlockedOnDelay:
4095 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4098 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4100 case BlockedOnException:
4101 debugBelch("is blocked on delivering an exception to thread %d",
4102 tso->block_info.tso->id);
4104 case BlockedOnBlackHole:
4105 debugBelch("is blocked on a black hole");
4108 debugBelch("is not blocked");
4110 #if defined(PARALLEL_HASKELL)
4112 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4113 tso->block_info.closure, info_type(tso->block_info.closure));
4115 case BlockedOnGA_NoSend:
4116 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4117 tso->block_info.closure, info_type(tso->block_info.closure));
4120 case BlockedOnCCall:
4121 debugBelch("is blocked on an external call");
4123 case BlockedOnCCall_NoUnblockExc:
4124 debugBelch("is blocked on an external call (exceptions were already blocked)");
4127 debugBelch("is blocked on an STM operation");
4130 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4131 tso->why_blocked, tso->id, tso);
4136 printThreadStatus(StgTSO *t)
4138 debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4140 void *label = lookupThreadLabel(t->id);
4141 if (label) debugBelch("[\"%s\"] ",(char *)label);
4143 if (t->what_next == ThreadRelocated) {
4144 debugBelch("has been relocated...\n");
4146 switch (t->what_next) {
4148 debugBelch("has been killed");
4150 case ThreadComplete:
4151 debugBelch("has completed");
4154 printThreadBlockage(t);
4161 printAllThreads(void)
4168 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4169 ullong_format_string(TIME_ON_PROC(CurrentProc),
4170 time_string, rtsFalse/*no commas!*/);
4172 debugBelch("all threads at [%s]:\n", time_string);
4173 # elif defined(PARALLEL_HASKELL)
4174 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4175 ullong_format_string(CURRENT_TIME,
4176 time_string, rtsFalse/*no commas!*/);
4178 debugBelch("all threads at [%s]:\n", time_string);
4180 debugBelch("all threads:\n");
4183 for (i = 0; i < n_capabilities; i++) {
4184 cap = &capabilities[i];
4185 debugBelch("threads on capability %d:\n", cap->no);
4186 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4187 printThreadStatus(t);
4191 debugBelch("other threads:\n");
4192 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4193 if (t->why_blocked != NotBlocked) {
4194 printThreadStatus(t);
4196 if (t->what_next == ThreadRelocated) {
4199 next = t->global_link;
4206 printThreadQueue(StgTSO *t)
4209 for (; t != END_TSO_QUEUE; t = t->link) {
4210 printThreadStatus(t);
4213 debugBelch("%d threads on queue\n", i);
4217 Print a whole blocking queue attached to node (debugging only).
4219 # if defined(PARALLEL_HASKELL)
4221 print_bq (StgClosure *node)
4223 StgBlockingQueueElement *bqe;
4227 debugBelch("## BQ of closure %p (%s): ",
4228 node, info_type(node));
4230 /* should cover all closures that may have a blocking queue */
4231 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4232 get_itbl(node)->type == FETCH_ME_BQ ||
4233 get_itbl(node)->type == RBH ||
4234 get_itbl(node)->type == MVAR);
4236 ASSERT(node!=(StgClosure*)NULL); // sanity check
4238 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4242 Print a whole blocking queue starting with the element bqe.
4245 print_bqe (StgBlockingQueueElement *bqe)
4250 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4252 for (end = (bqe==END_BQ_QUEUE);
4253 !end; // iterate until bqe points to a CONSTR
4254 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
4255 bqe = end ? END_BQ_QUEUE : bqe->link) {
4256 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4257 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4258 /* types of closures that may appear in a blocking queue */
4259 ASSERT(get_itbl(bqe)->type == TSO ||
4260 get_itbl(bqe)->type == BLOCKED_FETCH ||
4261 get_itbl(bqe)->type == CONSTR);
4262 /* only BQs of an RBH end with an RBH_Save closure */
4263 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4265 switch (get_itbl(bqe)->type) {
4267 debugBelch(" TSO %u (%x),",
4268 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4271 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4272 ((StgBlockedFetch *)bqe)->node,
4273 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4274 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4275 ((StgBlockedFetch *)bqe)->ga.weight);
4278 debugBelch(" %s (IP %p),",
4279 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4280 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4281 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4282 "RBH_Save_?"), get_itbl(bqe));
4285 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4286 info_type((StgClosure *)bqe)); // , node, info_type(node));
4292 # elif defined(GRAN)
4294 print_bq (StgClosure *node)
4296 StgBlockingQueueElement *bqe;
4297 PEs node_loc, tso_loc;
4300 /* should cover all closures that may have a blocking queue */
4301 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4302 get_itbl(node)->type == FETCH_ME_BQ ||
4303 get_itbl(node)->type == RBH);
4305 ASSERT(node!=(StgClosure*)NULL); // sanity check
4306 node_loc = where_is(node);
4308 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4309 node, info_type(node), node_loc);
4312 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4314 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4315 !end; // iterate until bqe points to a CONSTR
4316 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4317 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4318 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4319 /* types of closures that may appear in a blocking queue */
4320 ASSERT(get_itbl(bqe)->type == TSO ||
4321 get_itbl(bqe)->type == CONSTR);
4322 /* only BQs of an RBH end with an RBH_Save closure */
4323 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4325 tso_loc = where_is((StgClosure *)bqe);
4326 switch (get_itbl(bqe)->type) {
4328 debugBelch(" TSO %d (%p) on [PE %d],",
4329 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4332 debugBelch(" %s (IP %p),",
4333 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4334 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4335 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4336 "RBH_Save_?"), get_itbl(bqe));
4339 barf("Unexpected closure type %s in blocking queue of %p (%s)",
4340 info_type((StgClosure *)bqe), node, info_type(node));
4348 #if defined(PARALLEL_HASKELL)
4355 for (i=0, tso=run_queue_hd;
4356 tso != END_TSO_QUEUE;
4357 i++, tso=tso->link) {
4366 sched_belch(char *s, ...)
4371 debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4372 #elif defined(PARALLEL_HASKELL)
4375 debugBelch("sched: ");