1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.192 2004/02/27 16:16:31 simonmar Exp $
4 * (c) The GHC Team, 1998-2003
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distrib. memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
18 * --------------------------------------------------------------------------*/
21 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
23 The main scheduling loop in GUM iterates until a finish message is received.
24 In that case a global flag @receivedFinish@ is set and this instance of
25 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
26 for the handling of incoming messages, such as PP_FINISH.
27 Note that in the parallel case we have a system manager that coordinates
28 different PEs, each of which are running one instance of the RTS.
29 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
30 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
32 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
34 The main scheduling code in GranSim is quite different from that in std
35 (concurrent) Haskell: while concurrent Haskell just iterates over the
36 threads in the runnable queue, GranSim is event driven, i.e. it iterates
37 over the events in the global event queue. -- HWL
40 #include "PosixSource.h"
47 #include "StgStartup.h"
49 #define COMPILING_SCHEDULER
51 #include "StgMiscClosures.h"
53 #include "Interpreter.h"
54 #include "Exception.h"
61 #include "ThreadLabels.h"
63 #include "Proftimer.h"
66 #if defined(GRAN) || defined(PAR)
67 # include "GranSimRts.h"
69 # include "ParallelRts.h"
70 # include "Parallel.h"
71 # include "ParallelDebug.h"
76 #include "Capability.h"
77 #include "OSThreads.h"
80 #ifdef HAVE_SYS_TYPES_H
81 #include <sys/types.h>
96 #define USED_IN_THREADED_RTS
98 #define USED_IN_THREADED_RTS STG_UNUSED
101 #ifdef RTS_SUPPORTS_THREADS
102 #define USED_WHEN_RTS_SUPPORTS_THREADS
104 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
107 /* Main thread queue.
108 * Locks required: sched_mutex.
110 StgMainThread *main_threads = NULL;
113 * Locks required: sched_mutex.
117 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
118 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
121 In GranSim we have a runnable and a blocked queue for each processor.
122 In order to minimise code changes new arrays run_queue_hds/tls
123 are created. run_queue_hd is then a short cut (macro) for
124 run_queue_hds[CurrentProc] (see GranSim.h).
127 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
128 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
129 StgTSO *ccalling_threadss[MAX_PROC];
130 /* We use the same global list of threads (all_threads) in GranSim as in
131 the std RTS (i.e. we are cheating). However, we don't use this list in
132 the GranSim specific code at the moment (so we are only potentially
137 StgTSO *run_queue_hd = NULL;
138 StgTSO *run_queue_tl = NULL;
139 StgTSO *blocked_queue_hd = NULL;
140 StgTSO *blocked_queue_tl = NULL;
141 StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */
145 /* Linked list of all threads.
146 * Used for detecting garbage collected threads.
148 StgTSO *all_threads = NULL;
150 /* When a thread performs a safe C call (_ccall_GC, using old
151 * terminology), it gets put on the suspended_ccalling_threads
152 * list. Used by the garbage collector.
154 static StgTSO *suspended_ccalling_threads;
156 static StgTSO *threadStackOverflow(StgTSO *tso);
158 /* KH: The following two flags are shared memory locations. There is no need
159 to lock them, since they are only unset at the end of a scheduler
163 /* flag set by signal handler to precipitate a context switch */
164 nat context_switch = 0;
166 /* if this flag is set as well, give up execution */
167 rtsBool interrupted = rtsFalse;
169 /* Next thread ID to allocate.
170 * Locks required: thread_id_mutex
172 static StgThreadID next_thread_id = 1;
175 * Pointers to the state of the current thread.
176 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
177 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
180 /* The smallest stack size that makes any sense is:
181 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
182 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
183 * + 1 (the closure to enter)
185 * + 1 (spare slot req'd by stg_ap_v_ret)
187 * A thread with this stack will bomb immediately with a stack
188 * overflow, which will increase its stack size.
191 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
198 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
199 * exists - earlier gccs apparently didn't.
204 static rtsBool ready_to_gc;
207 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
208 * in an MT setting, needed to signal that a worker thread shouldn't hang around
209 * in the scheduler when it is out of work.
211 static rtsBool shutting_down_scheduler = rtsFalse;
213 void addToBlockedQueue ( StgTSO *tso );
215 static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
216 void interruptStgRts ( void );
218 static void detectBlackHoles ( void );
220 #if defined(RTS_SUPPORTS_THREADS)
221 /* ToDo: carefully document the invariants that go together
222 * with these synchronisation objects.
224 Mutex sched_mutex = INIT_MUTEX_VAR;
225 Mutex term_mutex = INIT_MUTEX_VAR;
227 #endif /* RTS_SUPPORTS_THREADS */
231 rtsTime TimeOfLastYield;
232 rtsBool emitSchedule = rtsTrue;
236 static char *whatNext_strs[] = {
246 StgTSO * createSparkThread(rtsSpark spark);
247 StgTSO * activateSpark (rtsSpark spark);
250 /* ----------------------------------------------------------------------------
252 * ------------------------------------------------------------------------- */
254 #if defined(RTS_SUPPORTS_THREADS)
255 static rtsBool startingWorkerThread = rtsFalse;
257 static void taskStart(void);
261 ACQUIRE_LOCK(&sched_mutex);
263 RELEASE_LOCK(&sched_mutex);
267 startSchedulerTaskIfNecessary(void)
269 if(run_queue_hd != END_TSO_QUEUE
270 || blocked_queue_hd != END_TSO_QUEUE
271 || sleeping_queue != END_TSO_QUEUE)
273 if(!startingWorkerThread)
274 { // we don't want to start another worker thread
275 // just because the last one hasn't yet reached the
276 // "waiting for capability" state
277 startingWorkerThread = rtsTrue;
278 startTask(taskStart);
284 /* ---------------------------------------------------------------------------
285 Main scheduling loop.
287 We use round-robin scheduling, each thread returning to the
288 scheduler loop when one of these conditions is detected:
291 * timer expires (thread yields)
296 Locking notes: we acquire the scheduler lock once at the beginning
297 of the scheduler loop, and release it when
299 * running a thread, or
300 * waiting for work, or
301 * waiting for a GC to complete.
304 In a GranSim setup this loop iterates over the global event queue.
305 This revolves around the global event queue, which determines what
306 to do next. Therefore, it's more complicated than either the
307 concurrent or the parallel (GUM) setup.
310 GUM iterates over incoming messages.
311 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312 and sends out a fish whenever it has nothing to do; in-between
313 doing the actual reductions (shared code below) it processes the
314 incoming messages and deals with delayed operations
315 (see PendingFetches).
316 This is not the ugliest code you could imagine, but it's bloody close.
318 ------------------------------------------------------------------------ */
320 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
321 Capability *initialCapability )
325 StgThreadReturnCode ret;
333 rtsBool receivedFinish = rtsFalse;
335 nat tp_size, sp_size; // stats only
338 rtsBool was_interrupted = rtsFalse;
339 StgTSOWhatNext prev_what_next;
341 // Pre-condition: sched_mutex is held.
342 // We might have a capability, passed in as initialCapability.
343 cap = initialCapability;
345 #if defined(RTS_SUPPORTS_THREADS)
347 // in the threaded case, the capability is either passed in via the
348 // initialCapability parameter, or initialized inside the scheduler
352 sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
353 mainThread, initialCapability);
356 // simply initialise it in the non-threaded case
357 grabCapability(&cap);
361 /* set up first event to get things going */
362 /* ToDo: assign costs for system setup and init MainTSO ! */
363 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
365 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
368 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
369 G_TSO(CurrentTSO, 5));
371 if (RtsFlags.GranFlags.Light) {
372 /* Save current time; GranSim Light only */
373 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
376 event = get_next_event();
378 while (event!=(rtsEvent*)NULL) {
379 /* Choose the processor with the next event */
380 CurrentProc = event->proc;
381 CurrentTSO = event->tso;
385 while (!receivedFinish) { /* set by processMessages */
386 /* when receiving PP_FINISH message */
388 #else // everything except GRAN and PAR
394 IF_DEBUG(scheduler, printAllThreads());
396 #if defined(RTS_SUPPORTS_THREADS)
397 // Yield the capability to higher-priority tasks if necessary.
400 yieldCapability(&cap);
403 // If we do not currently hold a capability, we wait for one
406 waitForCapability(&sched_mutex, &cap,
407 mainThread ? &mainThread->bound_thread_cond : NULL);
410 // We now have a capability...
414 // If we're interrupted (the user pressed ^C, or some other
415 // termination condition occurred), kill all the currently running
419 IF_DEBUG(scheduler, sched_belch("interrupted"));
420 interrupted = rtsFalse;
421 was_interrupted = rtsTrue;
422 #if defined(RTS_SUPPORTS_THREADS)
423 // In the threaded RTS, deadlock detection doesn't work,
424 // so just exit right away.
425 prog_belch("interrupted");
426 releaseCapability(cap);
427 RELEASE_LOCK(&sched_mutex);
428 shutdownHaskellAndExit(EXIT_SUCCESS);
435 // Go through the list of main threads and wake up any
436 // clients whose computations have finished. ToDo: this
437 // should be done more efficiently without a linear scan
438 // of the main threads list, somehow...
440 #if defined(RTS_SUPPORTS_THREADS)
442 StgMainThread *m, **prev;
443 prev = &main_threads;
444 for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
445 if (m->tso->what_next == ThreadComplete
446 || m->tso->what_next == ThreadKilled) {
447 if (m == mainThread) {
448 if (m->tso->what_next == ThreadComplete) {
450 // NOTE: return val is tso->sp[1]
451 // (see StgStartup.hc)
452 *(m->ret) = (StgClosure *)m->tso->sp[1];
459 if (was_interrupted) {
460 m->stat = Interrupted;
467 removeThreadLabel((StgWord)m->tso->id);
469 releaseCapability(cap);
472 // The current OS thread can not handle the fact that
473 // the Haskell thread "m" has ended. "m" is bound;
474 // the scheduler loop in its bound OS thread has to
475 // return, so let's pass the capability directly to
477 passCapability(&m->bound_thread_cond);
484 #else /* not threaded */
487 /* in GUM do this only on the Main PE */
490 /* If our main thread has finished or been killed, return.
493 StgMainThread *m = main_threads;
494 if (m->tso->what_next == ThreadComplete
495 || m->tso->what_next == ThreadKilled) {
497 removeThreadLabel((StgWord)m->tso->id);
499 main_threads = main_threads->link;
500 if (m->tso->what_next == ThreadComplete) {
501 // We finished successfully, fill in the return value
502 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
503 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
507 if (m->ret) { *(m->ret) = NULL; };
508 if (was_interrupted) {
509 m->stat = Interrupted;
520 #if defined(RTS_USER_SIGNALS)
521 // check for signals each time around the scheduler
522 if (signals_pending()) {
523 RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
524 startSignalHandlers();
525 ACQUIRE_LOCK(&sched_mutex);
530 // Check whether any waiting threads need to be woken up. If the
531 // run queue is empty, and there are no other tasks running, we
532 // can wait indefinitely for something to happen.
534 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
535 #if defined(RTS_SUPPORTS_THREADS)
540 awaitEvent( EMPTY_RUN_QUEUE() );
542 // we can be interrupted while waiting for I/O...
543 if (interrupted) continue;
546 * Detect deadlock: when we have no threads to run, there are no
547 * threads waiting on I/O or sleeping, and all the other tasks are
548 * waiting for work, we must have a deadlock of some description.
550 * We first try to find threads blocked on themselves (ie. black
551 * holes), and generate NonTermination exceptions where necessary.
553 * If no threads are black holed, we have a deadlock situation, so
554 * inform all the main threads.
556 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
557 if ( EMPTY_THREAD_QUEUES() )
559 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
560 // Garbage collection can release some new threads due to
561 // either (a) finalizers or (b) threads resurrected because
562 // they are about to be send BlockedOnDeadMVar. Any threads
563 // thus released will be immediately runnable.
564 GarbageCollect(GetRoots,rtsTrue);
566 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
569 sched_belch("still deadlocked, checking for black holes..."));
572 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
574 #if defined(RTS_USER_SIGNALS)
575 /* If we have user-installed signal handlers, then wait
576 * for signals to arrive rather then bombing out with a
579 if ( anyUserHandlers() ) {
581 sched_belch("still deadlocked, waiting for signals..."));
585 // we might be interrupted...
586 if (interrupted) { continue; }
588 if (signals_pending()) {
589 RELEASE_LOCK(&sched_mutex);
590 startSignalHandlers();
591 ACQUIRE_LOCK(&sched_mutex);
593 ASSERT(!EMPTY_RUN_QUEUE());
598 /* Probably a real deadlock. Send the current main thread the
599 * Deadlock exception (or in the SMP build, send *all* main
600 * threads the deadlock exception, since none of them can make
606 switch (m->tso->why_blocked) {
607 case BlockedOnBlackHole:
608 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
610 case BlockedOnException:
612 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
615 barf("deadlock: main thread blocked in a strange way");
621 #elif defined(RTS_SUPPORTS_THREADS)
622 // ToDo: add deadlock detection in threaded RTS
624 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
627 #if defined(RTS_SUPPORTS_THREADS)
628 if ( EMPTY_RUN_QUEUE() ) {
629 continue; // nothing to do
634 if (RtsFlags.GranFlags.Light)
635 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
637 /* adjust time based on time-stamp */
638 if (event->time > CurrentTime[CurrentProc] &&
639 event->evttype != ContinueThread)
640 CurrentTime[CurrentProc] = event->time;
642 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
643 if (!RtsFlags.GranFlags.Light)
646 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
648 /* main event dispatcher in GranSim */
649 switch (event->evttype) {
650 /* Should just be continuing execution */
652 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
653 /* ToDo: check assertion
654 ASSERT(run_queue_hd != (StgTSO*)NULL &&
655 run_queue_hd != END_TSO_QUEUE);
657 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
658 if (!RtsFlags.GranFlags.DoAsyncFetch &&
659 procStatus[CurrentProc]==Fetching) {
660 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
661 CurrentTSO->id, CurrentTSO, CurrentProc);
664 /* Ignore ContinueThreads for completed threads */
665 if (CurrentTSO->what_next == ThreadComplete) {
666 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
667 CurrentTSO->id, CurrentTSO, CurrentProc);
670 /* Ignore ContinueThreads for threads that are being migrated */
671 if (PROCS(CurrentTSO)==Nowhere) {
672 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
673 CurrentTSO->id, CurrentTSO, CurrentProc);
676 /* The thread should be at the beginning of the run queue */
677 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
678 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
679 CurrentTSO->id, CurrentTSO, CurrentProc);
680 break; // run the thread anyway
683 new_event(proc, proc, CurrentTime[proc],
685 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
687 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
688 break; // now actually run the thread; DaH Qu'vam yImuHbej
691 do_the_fetchnode(event);
692 goto next_thread; /* handle next event in event queue */
695 do_the_globalblock(event);
696 goto next_thread; /* handle next event in event queue */
699 do_the_fetchreply(event);
700 goto next_thread; /* handle next event in event queue */
702 case UnblockThread: /* Move from the blocked queue to the tail of */
703 do_the_unblock(event);
704 goto next_thread; /* handle next event in event queue */
706 case ResumeThread: /* Move from the blocked queue to the tail of */
707 /* the runnable queue ( i.e. Qu' SImqa'lu') */
708 event->tso->gran.blocktime +=
709 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
710 do_the_startthread(event);
711 goto next_thread; /* handle next event in event queue */
714 do_the_startthread(event);
715 goto next_thread; /* handle next event in event queue */
718 do_the_movethread(event);
719 goto next_thread; /* handle next event in event queue */
722 do_the_movespark(event);
723 goto next_thread; /* handle next event in event queue */
726 do_the_findwork(event);
727 goto next_thread; /* handle next event in event queue */
730 barf("Illegal event type %u\n", event->evttype);
733 /* This point was scheduler_loop in the old RTS */
735 IF_DEBUG(gran, belch("GRAN: after main switch"));
737 TimeOfLastEvent = CurrentTime[CurrentProc];
738 TimeOfNextEvent = get_time_of_next_event();
739 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
740 // CurrentTSO = ThreadQueueHd;
742 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
745 if (RtsFlags.GranFlags.Light)
746 GranSimLight_leave_system(event, &ActiveTSO);
748 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
751 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
753 /* in a GranSim setup the TSO stays on the run queue */
755 /* Take a thread from the run queue. */
756 POP_RUN_QUEUE(t); // take_off_run_queue(t);
759 fprintf(stderr, "GRAN: About to run current thread, which is\n");
762 context_switch = 0; // turned on via GranYield, checking events and time slice
765 DumpGranEvent(GR_SCHEDULE, t));
767 procStatus[CurrentProc] = Busy;
770 if (PendingFetches != END_BF_QUEUE) {
774 /* ToDo: phps merge with spark activation above */
775 /* check whether we have local work and send requests if we have none */
776 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
777 /* :-[ no local threads => look out for local sparks */
778 /* the spark pool for the current PE */
779 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
780 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
781 pool->hd < pool->tl) {
783 * ToDo: add GC code check that we really have enough heap afterwards!!
785 * If we're here (no runnable threads) and we have pending
786 * sparks, we must have a space problem. Get enough space
787 * to turn one of those pending sparks into a
791 spark = findSpark(rtsFalse); /* get a spark */
792 if (spark != (rtsSpark) NULL) {
793 tso = activateSpark(spark); /* turn the spark into a thread */
794 IF_PAR_DEBUG(schedule,
795 belch("==== schedule: Created TSO %d (%p); %d threads active",
796 tso->id, tso, advisory_thread_count));
798 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
799 belch("==^^ failed to activate spark");
801 } /* otherwise fall through & pick-up new tso */
803 IF_PAR_DEBUG(verbose,
804 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
805 spark_queue_len(pool)));
810 /* If we still have no work we need to send a FISH to get a spark
813 if (EMPTY_RUN_QUEUE()) {
814 /* =8-[ no local sparks => look for work on other PEs */
816 * We really have absolutely no work. Send out a fish
817 * (there may be some out there already), and wait for
818 * something to arrive. We clearly can't run any threads
819 * until a SCHEDULE or RESUME arrives, and so that's what
820 * we're hoping to see. (Of course, we still have to
821 * respond to other types of messages.)
823 TIME now = msTime() /*CURRENT_TIME*/;
824 IF_PAR_DEBUG(verbose,
825 belch("-- now=%ld", now));
826 IF_PAR_DEBUG(verbose,
827 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
828 (last_fish_arrived_at!=0 &&
829 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
830 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
831 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
832 last_fish_arrived_at,
833 RtsFlags.ParFlags.fishDelay, now);
836 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
837 (last_fish_arrived_at==0 ||
838 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
839 /* outstandingFishes is set in sendFish, processFish;
840 avoid flooding system with fishes via delay */
842 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
845 // Global statistics: count no. of fishes
846 if (RtsFlags.ParFlags.ParStats.Global &&
847 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
848 globalParStats.tot_fish_mess++;
852 receivedFinish = processMessages();
855 } else if (PacketsWaiting()) { /* Look for incoming messages */
856 receivedFinish = processMessages();
859 /* Now we are sure that we have some work available */
860 ASSERT(run_queue_hd != END_TSO_QUEUE);
862 /* Take a thread from the run queue, if we have work */
863 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
864 IF_DEBUG(sanity,checkTSO(t));
866 /* ToDo: write something to the log-file
867 if (RTSflags.ParFlags.granSimStats && !sameThread)
868 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
872 /* the spark pool for the current PE */
873 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
876 belch("--=^ %d threads, %d sparks on [%#x]",
877 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
880 if (0 && RtsFlags.ParFlags.ParStats.Full &&
881 t && LastTSO && t->id != LastTSO->id &&
882 LastTSO->why_blocked == NotBlocked &&
883 LastTSO->what_next != ThreadComplete) {
884 // if previously scheduled TSO not blocked we have to record the context switch
885 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
886 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
889 if (RtsFlags.ParFlags.ParStats.Full &&
890 (emitSchedule /* forced emit */ ||
891 (t && LastTSO && t->id != LastTSO->id))) {
893 we are running a different TSO, so write a schedule event to log file
894 NB: If we use fair scheduling we also have to write a deschedule
895 event for LastTSO; with unfair scheduling we know that the
896 previous tso has blocked whenever we switch to another tso, so
897 we don't need it in GUM for now
899 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
900 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
901 emitSchedule = rtsFalse;
905 #else /* !GRAN && !PAR */
907 // grab a thread from the run queue
908 ASSERT(run_queue_hd != END_TSO_QUEUE);
911 // Sanity check the thread we're about to run. This can be
912 // expensive if there is lots of thread switching going on...
913 IF_DEBUG(sanity,checkTSO(t));
919 for(m = main_threads; m; m = m->link)
930 sched_belch("### Running thread %d in bound thread", t->id));
931 // yes, the Haskell thread is bound to the current native thread
936 sched_belch("### thread %d bound to another OS thread", t->id));
937 // no, bound to a different Haskell thread: pass to that thread
938 PUSH_ON_RUN_QUEUE(t);
939 passCapability(&m->bound_thread_cond);
945 if(mainThread != NULL)
946 // The thread we want to run is bound.
949 sched_belch("### this OS thread cannot run thread %d", t->id));
950 // no, the current native thread is bound to a different
951 // Haskell thread, so pass it to any worker thread
952 PUSH_ON_RUN_QUEUE(t);
953 passCapabilityToWorker();
960 cap->r.rCurrentTSO = t;
962 /* context switches are now initiated by the timer signal, unless
963 * the user specified "context switch as often as possible", with
966 if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
967 && (run_queue_hd != END_TSO_QUEUE
968 || blocked_queue_hd != END_TSO_QUEUE
969 || sleeping_queue != END_TSO_QUEUE)))
976 RELEASE_LOCK(&sched_mutex);
978 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
979 t->id, whatNext_strs[t->what_next]));
982 startHeapProfTimer();
985 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
986 /* Run the current thread
988 prev_what_next = t->what_next;
989 switch (prev_what_next) {
992 /* Thread already finished, return to scheduler. */
993 ret = ThreadFinished;
996 errno = t->saved_errno;
997 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
998 t->saved_errno = errno;
1000 case ThreadInterpret:
1001 ret = interpretBCO(cap);
1004 barf("schedule: invalid what_next field");
1006 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1008 /* Costs for the scheduler are assigned to CCS_SYSTEM */
1010 stopHeapProfTimer();
1014 ACQUIRE_LOCK(&sched_mutex);
1016 #ifdef RTS_SUPPORTS_THREADS
1017 IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
1018 #elif !defined(GRAN) && !defined(PAR)
1019 IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
1021 t = cap->r.rCurrentTSO;
1024 /* HACK 675: if the last thread didn't yield, make sure to print a
1025 SCHEDULE event to the log file when StgRunning the next thread, even
1026 if it is the same one as before */
1028 TimeOfLastYield = CURRENT_TIME;
1034 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1035 globalGranStats.tot_heapover++;
1037 globalParStats.tot_heapover++;
1040 // did the task ask for a large block?
1041 if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1042 // if so, get one and push it on the front of the nursery.
1046 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1048 IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)",
1049 t->id, whatNext_strs[t->what_next], blocks));
1051 // don't do this if it would push us over the
1052 // alloc_blocks_lim limit; we'll GC first.
1053 if (alloc_blocks + blocks < alloc_blocks_lim) {
1055 alloc_blocks += blocks;
1056 bd = allocGroup( blocks );
1058 // link the new group into the list
1059 bd->link = cap->r.rCurrentNursery;
1060 bd->u.back = cap->r.rCurrentNursery->u.back;
1061 if (cap->r.rCurrentNursery->u.back != NULL) {
1062 cap->r.rCurrentNursery->u.back->link = bd;
1064 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1065 g0s0->blocks == cap->r.rNursery);
1066 cap->r.rNursery = g0s0->blocks = bd;
1068 cap->r.rCurrentNursery->u.back = bd;
1070 // initialise it as a nursery block. We initialise the
1071 // step, gen_no, and flags field of *every* sub-block in
1072 // this large block, because this is easier than making
1073 // sure that we always find the block head of a large
1074 // block whenever we call Bdescr() (eg. evacuate() and
1075 // isAlive() in the GC would both have to do this, at
1079 for (x = bd; x < bd + blocks; x++) {
1086 // don't forget to update the block count in g0s0.
1087 g0s0->n_blocks += blocks;
1088 // This assert can be a killer if the app is doing lots
1089 // of large block allocations.
1090 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1092 // now update the nursery to point to the new block
1093 cap->r.rCurrentNursery = bd;
1095 // we might be unlucky and have another thread get on the
1096 // run queue before us and steal the large block, but in that
1097 // case the thread will just end up requesting another large
1099 PUSH_ON_RUN_QUEUE(t);
1104 /* make all the running tasks block on a condition variable,
1105 * maybe set context_switch and wait till they all pile in,
1106 * then have them wait on a GC condition variable.
1108 IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow",
1109 t->id, whatNext_strs[t->what_next]));
1112 ASSERT(!is_on_queue(t,CurrentProc));
1114 /* Currently we emit a DESCHEDULE event before GC in GUM.
1115 ToDo: either add separate event to distinguish SYSTEM time from rest
1116 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1117 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1118 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1119 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1120 emitSchedule = rtsTrue;
1124 ready_to_gc = rtsTrue;
1125 context_switch = 1; /* stop other threads ASAP */
1126 PUSH_ON_RUN_QUEUE(t);
1127 /* actual GC is done at the end of the while loop */
1133 DumpGranEvent(GR_DESCHEDULE, t));
1134 globalGranStats.tot_stackover++;
1137 // DumpGranEvent(GR_DESCHEDULE, t);
1138 globalParStats.tot_stackover++;
1140 IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow",
1141 t->id, whatNext_strs[t->what_next]));
1142 /* just adjust the stack for this thread, then pop it back
1148 /* enlarge the stack */
1149 StgTSO *new_t = threadStackOverflow(t);
1151 /* This TSO has moved, so update any pointers to it from the
1152 * main thread stack. It better not be on any other queues...
1153 * (it shouldn't be).
1155 for (m = main_threads; m != NULL; m = m->link) {
1160 threadPaused(new_t);
1161 PUSH_ON_RUN_QUEUE(new_t);
1165 case ThreadYielding:
1168 DumpGranEvent(GR_DESCHEDULE, t));
1169 globalGranStats.tot_yields++;
1172 // DumpGranEvent(GR_DESCHEDULE, t);
1173 globalParStats.tot_yields++;
1175 /* put the thread back on the run queue. Then, if we're ready to
1176 * GC, check whether this is the last task to stop. If so, wake
1177 * up the GC thread. getThread will block during a GC until the
1181 if (t->what_next != prev_what_next) {
1182 belch("--<< thread %ld (%s) stopped to switch evaluators",
1183 t->id, whatNext_strs[t->what_next]);
1185 belch("--<< thread %ld (%s) stopped, yielding",
1186 t->id, whatNext_strs[t->what_next]);
1191 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1193 ASSERT(t->link == END_TSO_QUEUE);
1195 // Shortcut if we're just switching evaluators: don't bother
1196 // doing stack squeezing (which can be expensive), just run the
1198 if (t->what_next != prev_what_next) {
1205 ASSERT(!is_on_queue(t,CurrentProc));
1208 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1209 checkThreadQsSanity(rtsTrue));
1213 if (RtsFlags.ParFlags.doFairScheduling) {
1214 /* this does round-robin scheduling; good for concurrency */
1215 APPEND_TO_RUN_QUEUE(t);
1217 /* this does unfair scheduling; good for parallelism */
1218 PUSH_ON_RUN_QUEUE(t);
1221 // this does round-robin scheduling; good for concurrency
1222 APPEND_TO_RUN_QUEUE(t);
1226 /* add a ContinueThread event to actually process the thread */
1227 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1229 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1231 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1240 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1241 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1242 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1244 // ??? needed; should emit block before
1246 DumpGranEvent(GR_DESCHEDULE, t));
1247 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1250 ASSERT(procStatus[CurrentProc]==Busy ||
1251 ((procStatus[CurrentProc]==Fetching) &&
1252 (t->block_info.closure!=(StgClosure*)NULL)));
1253 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1254 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1255 procStatus[CurrentProc]==Fetching))
1256 procStatus[CurrentProc] = Idle;
1260 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1261 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1264 if (t->block_info.closure!=(StgClosure*)NULL)
1265 print_bq(t->block_info.closure));
1267 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1270 /* whatever we schedule next, we must log that schedule */
1271 emitSchedule = rtsTrue;
1274 /* don't need to do anything. Either the thread is blocked on
1275 * I/O, in which case we'll have called addToBlockedQueue
1276 * previously, or it's blocked on an MVar or Blackhole, in which
1277 * case it'll be on the relevant queue already.
1280 fprintf(stderr, "--<< thread %d (%s) stopped: ",
1281 t->id, whatNext_strs[t->what_next]);
1282 printThreadBlockage(t);
1283 fprintf(stderr, "\n"));
1286 /* Only for dumping event to log file
1287 ToDo: do I need this in GranSim, too?
1294 case ThreadFinished:
1295 /* Need to check whether this was a main thread, and if so, signal
1296 * the task that started it with the return value. If we have no
1297 * more main threads, we probably need to stop all the tasks until
1300 /* We also end up here if the thread kills itself with an
1301 * uncaught exception, see Exception.hc.
1303 IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished",
1304 t->id, whatNext_strs[t->what_next]));
1306 endThread(t, CurrentProc); // clean-up the thread
1308 /* For now all are advisory -- HWL */
1309 //if(t->priority==AdvisoryPriority) ??
1310 advisory_thread_count--;
1313 if(t->dist.priority==RevalPriority)
1317 if (RtsFlags.ParFlags.ParStats.Full &&
1318 !RtsFlags.ParFlags.ParStats.Suppressed)
1319 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1324 barf("schedule: invalid thread return code %d", (int)ret);
1328 // When we have +RTS -i0 and we're heap profiling, do a census at
1329 // every GC. This lets us get repeatable runs for debugging.
1330 if (performHeapProfile ||
1331 (RtsFlags.ProfFlags.profileInterval==0 &&
1332 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1333 GarbageCollect(GetRoots, rtsTrue);
1335 performHeapProfile = rtsFalse;
1336 ready_to_gc = rtsFalse; // we already GC'd
1341 /* everybody back, start the GC.
1342 * Could do it in this thread, or signal a condition var
1343 * to do it in another thread. Either way, we need to
1344 * broadcast on gc_pending_cond afterward.
1346 #if defined(RTS_SUPPORTS_THREADS)
1347 IF_DEBUG(scheduler,sched_belch("doing GC"));
1349 GarbageCollect(GetRoots,rtsFalse);
1350 ready_to_gc = rtsFalse;
1352 /* add a ContinueThread event to continue execution of current thread */
1353 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1355 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1357 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1365 IF_GRAN_DEBUG(unused,
1366 print_eventq(EventHd));
1368 event = get_next_event();
1371 /* ToDo: wait for next message to arrive rather than busy wait */
1374 } /* end of while(1) */
1376 IF_PAR_DEBUG(verbose,
1377 belch("== Leaving schedule() after having received Finish"));
1380 /* ---------------------------------------------------------------------------
1381 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1382 * used by Control.Concurrent for error checking.
1383 * ------------------------------------------------------------------------- */
1386 rtsSupportsBoundThreads(void)
1395 /* ---------------------------------------------------------------------------
1396 * isThreadBound(tso): check whether tso is bound to an OS thread.
1397 * ------------------------------------------------------------------------- */
1400 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1404 for(m = main_threads; m; m = m->link)
1413 /* ---------------------------------------------------------------------------
1414 * Singleton fork(). Do not copy any running threads.
1415 * ------------------------------------------------------------------------- */
1418 deleteThreadImmediately(StgTSO *tso);
1421 forkProcess(HsStablePtr *entry)
1423 #ifndef mingw32_TARGET_OS
1429 IF_DEBUG(scheduler,sched_belch("forking!"));
1430 rts_lock(); // This not only acquires sched_mutex, it also
1431 // makes sure that no other threads are running
1435 if (pid) { /* parent */
1437 /* just return the pid */
1441 } else { /* child */
1444 // delete all threads
1445 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1447 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1450 // don't allow threads to catch the ThreadKilled exception
1451 deleteThreadImmediately(t);
1454 // wipe the main thread list
1455 while((m = main_threads) != NULL) {
1456 main_threads = m->link;
1458 closeCondition(&m->bound_thread_cond);
1463 #ifdef RTS_SUPPORTS_THREADS
1464 resetTaskManagerAfterFork(); // tell startTask() and friends that
1465 startingWorkerThread = rtsFalse; // we have no worker threads any more
1466 resetWorkerWakeupPipeAfterFork();
1469 rc = rts_evalStableIO(entry, NULL); // run the action
1470 rts_checkSchedStatus("forkProcess",rc);
1474 hs_exit(); // clean up and exit
1478 barf("forkProcess#: primop not implemented for mingw32, sorry!\n");
1480 #endif /* mingw32 */
1483 /* ---------------------------------------------------------------------------
1484 * deleteAllThreads(): kill all the live threads.
1486 * This is used when we catch a user interrupt (^C), before performing
1487 * any necessary cleanups and running finalizers.
1489 * Locks: sched_mutex held.
1490 * ------------------------------------------------------------------------- */
1493 deleteAllThreads ( void )
1496 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1497 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1498 next = t->global_link;
1501 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1502 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1503 sleeping_queue = END_TSO_QUEUE;
1506 /* startThread and insertThread are now in GranSim.c -- HWL */
1509 /* ---------------------------------------------------------------------------
1510 * Suspending & resuming Haskell threads.
1512 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1513 * its capability before calling the C function. This allows another
1514 * task to pick up the capability and carry on running Haskell
1515 * threads. It also means that if the C call blocks, it won't lock
1518 * The Haskell thread making the C call is put to sleep for the
1519 * duration of the call, on the susepended_ccalling_threads queue. We
1520 * give out a token to the task, which it can use to resume the thread
1521 * on return from the C function.
1522 * ------------------------------------------------------------------------- */
1525 suspendThread( StgRegTable *reg,
1534 int saved_errno = errno;
1536 /* assume that *reg is a pointer to the StgRegTable part
1539 cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1541 ACQUIRE_LOCK(&sched_mutex);
1544 sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1546 // XXX this might not be necessary --SDM
1547 cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1549 threadPaused(cap->r.rCurrentTSO);
1550 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1551 suspended_ccalling_threads = cap->r.rCurrentTSO;
1553 #if defined(RTS_SUPPORTS_THREADS)
1554 if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1556 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1557 cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1561 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1565 /* Use the thread ID as the token; it should be unique */
1566 tok = cap->r.rCurrentTSO->id;
1568 /* Hand back capability */
1569 releaseCapability(cap);
1571 #if defined(RTS_SUPPORTS_THREADS)
1572 /* Preparing to leave the RTS, so ensure there's a native thread/task
1573 waiting to take over.
1575 IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1578 /* Other threads _might_ be available for execution; signal this */
1580 RELEASE_LOCK(&sched_mutex);
1582 errno = saved_errno;
1587 resumeThread( StgInt tok,
1588 rtsBool concCall STG_UNUSED )
1590 StgTSO *tso, **prev;
1592 int saved_errno = errno;
1594 #if defined(RTS_SUPPORTS_THREADS)
1595 /* Wait for permission to re-enter the RTS with the result. */
1596 ACQUIRE_LOCK(&sched_mutex);
1597 waitForReturnCapability(&sched_mutex, &cap);
1599 IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1601 grabCapability(&cap);
1604 /* Remove the thread off of the suspended list */
1605 prev = &suspended_ccalling_threads;
1606 for (tso = suspended_ccalling_threads;
1607 tso != END_TSO_QUEUE;
1608 prev = &tso->link, tso = tso->link) {
1609 if (tso->id == (StgThreadID)tok) {
1614 if (tso == END_TSO_QUEUE) {
1615 barf("resumeThread: thread not found");
1617 tso->link = END_TSO_QUEUE;
1619 #if defined(RTS_SUPPORTS_THREADS)
1620 if(tso->why_blocked == BlockedOnCCall)
1622 awakenBlockedQueueNoLock(tso->blocked_exceptions);
1623 tso->blocked_exceptions = NULL;
1627 /* Reset blocking status */
1628 tso->why_blocked = NotBlocked;
1630 cap->r.rCurrentTSO = tso;
1631 RELEASE_LOCK(&sched_mutex);
1632 errno = saved_errno;
1637 /* ---------------------------------------------------------------------------
1639 * ------------------------------------------------------------------------ */
1640 static void unblockThread(StgTSO *tso);
1642 /* ---------------------------------------------------------------------------
1643 * Comparing Thread ids.
1645 * This is used from STG land in the implementation of the
1646 * instances of Eq/Ord for ThreadIds.
1647 * ------------------------------------------------------------------------ */
1650 cmp_thread(StgPtr tso1, StgPtr tso2)
1652 StgThreadID id1 = ((StgTSO *)tso1)->id;
1653 StgThreadID id2 = ((StgTSO *)tso2)->id;
1655 if (id1 < id2) return (-1);
1656 if (id1 > id2) return 1;
1660 /* ---------------------------------------------------------------------------
1661 * Fetching the ThreadID from an StgTSO.
1663 * This is used in the implementation of Show for ThreadIds.
1664 * ------------------------------------------------------------------------ */
1666 rts_getThreadId(StgPtr tso)
1668 return ((StgTSO *)tso)->id;
1673 labelThread(StgPtr tso, char *label)
1678 /* Caveat: Once set, you can only set the thread name to "" */
1679 len = strlen(label)+1;
1680 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1681 strncpy(buf,label,len);
1682 /* Update will free the old memory for us */
1683 updateThreadLabel(((StgTSO *)tso)->id,buf);
1687 /* ---------------------------------------------------------------------------
1688 Create a new thread.
1690 The new thread starts with the given stack size. Before the
1691 scheduler can run, however, this thread needs to have a closure
1692 (and possibly some arguments) pushed on its stack. See
1693 pushClosure() in Schedule.h.
1695 createGenThread() and createIOThread() (in SchedAPI.h) are
1696 convenient packaged versions of this function.
1698 currently pri (priority) is only used in a GRAN setup -- HWL
1699 ------------------------------------------------------------------------ */
1701 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1703 createThread(nat size, StgInt pri)
1706 createThread(nat size)
1713 /* First check whether we should create a thread at all */
1715 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1716 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1718 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1719 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1720 return END_TSO_QUEUE;
1726 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1729 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1731 /* catch ridiculously small stack sizes */
1732 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1733 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1736 stack_size = size - TSO_STRUCT_SIZEW;
1738 tso = (StgTSO *)allocate(size);
1739 TICK_ALLOC_TSO(stack_size, 0);
1741 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1743 SET_GRAN_HDR(tso, ThisPE);
1746 // Always start with the compiled code evaluator
1747 tso->what_next = ThreadRunGHC;
1749 tso->id = next_thread_id++;
1750 tso->why_blocked = NotBlocked;
1751 tso->blocked_exceptions = NULL;
1753 tso->saved_errno = 0;
1755 tso->stack_size = stack_size;
1756 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1758 tso->sp = (P_)&(tso->stack) + stack_size;
1761 tso->prof.CCCS = CCS_MAIN;
1764 /* put a stop frame on the stack */
1765 tso->sp -= sizeofW(StgStopFrame);
1766 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1769 tso->link = END_TSO_QUEUE;
1770 /* uses more flexible routine in GranSim */
1771 insertThread(tso, CurrentProc);
1773 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1779 if (RtsFlags.GranFlags.GranSimStats.Full)
1780 DumpGranEvent(GR_START,tso);
1782 if (RtsFlags.ParFlags.ParStats.Full)
1783 DumpGranEvent(GR_STARTQ,tso);
1784 /* HACk to avoid SCHEDULE
1788 /* Link the new thread on the global thread list.
1790 tso->global_link = all_threads;
1794 tso->dist.priority = MandatoryPriority; //by default that is...
1798 tso->gran.pri = pri;
1800 tso->gran.magic = TSO_MAGIC; // debugging only
1802 tso->gran.sparkname = 0;
1803 tso->gran.startedat = CURRENT_TIME;
1804 tso->gran.exported = 0;
1805 tso->gran.basicblocks = 0;
1806 tso->gran.allocs = 0;
1807 tso->gran.exectime = 0;
1808 tso->gran.fetchtime = 0;
1809 tso->gran.fetchcount = 0;
1810 tso->gran.blocktime = 0;
1811 tso->gran.blockcount = 0;
1812 tso->gran.blockedat = 0;
1813 tso->gran.globalsparks = 0;
1814 tso->gran.localsparks = 0;
1815 if (RtsFlags.GranFlags.Light)
1816 tso->gran.clock = Now; /* local clock */
1818 tso->gran.clock = 0;
1820 IF_DEBUG(gran,printTSO(tso));
1823 tso->par.magic = TSO_MAGIC; // debugging only
1825 tso->par.sparkname = 0;
1826 tso->par.startedat = CURRENT_TIME;
1827 tso->par.exported = 0;
1828 tso->par.basicblocks = 0;
1829 tso->par.allocs = 0;
1830 tso->par.exectime = 0;
1831 tso->par.fetchtime = 0;
1832 tso->par.fetchcount = 0;
1833 tso->par.blocktime = 0;
1834 tso->par.blockcount = 0;
1835 tso->par.blockedat = 0;
1836 tso->par.globalsparks = 0;
1837 tso->par.localsparks = 0;
1841 globalGranStats.tot_threads_created++;
1842 globalGranStats.threads_created_on_PE[CurrentProc]++;
1843 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1844 globalGranStats.tot_sq_probes++;
1846 // collect parallel global statistics (currently done together with GC stats)
1847 if (RtsFlags.ParFlags.ParStats.Global &&
1848 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1849 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
1850 globalParStats.tot_threads_created++;
1856 belch("==__ schedule: Created TSO %d (%p);",
1857 CurrentProc, tso, tso->id));
1859 IF_PAR_DEBUG(verbose,
1860 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1861 tso->id, tso, advisory_thread_count));
1863 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1864 tso->id, tso->stack_size));
1871 all parallel thread creation calls should fall through the following routine.
1874 createSparkThread(rtsSpark spark)
1876 ASSERT(spark != (rtsSpark)NULL);
1877 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1879 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1880 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1881 return END_TSO_QUEUE;
1885 tso = createThread(RtsFlags.GcFlags.initialStkSize);
1886 if (tso==END_TSO_QUEUE)
1887 barf("createSparkThread: Cannot create TSO");
1889 tso->priority = AdvisoryPriority;
1891 pushClosure(tso,spark);
1892 PUSH_ON_RUN_QUEUE(tso);
1893 advisory_thread_count++;
1900 Turn a spark into a thread.
1901 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1905 activateSpark (rtsSpark spark)
1909 tso = createSparkThread(spark);
1910 if (RtsFlags.ParFlags.ParStats.Full) {
1911 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1912 IF_PAR_DEBUG(verbose,
1913 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1914 (StgClosure *)spark, info_type((StgClosure *)spark)));
1916 // ToDo: fwd info on local/global spark to thread -- HWL
1917 // tso->gran.exported = spark->exported;
1918 // tso->gran.locked = !spark->global;
1919 // tso->gran.sparkname = spark->name;
1925 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1926 Capability *initialCapability
1930 /* ---------------------------------------------------------------------------
1933 * scheduleThread puts a thread on the head of the runnable queue.
1934 * This will usually be done immediately after a thread is created.
1935 * The caller of scheduleThread must create the thread using e.g.
1936 * createThread and push an appropriate closure
1937 * on this thread's stack before the scheduler is invoked.
1938 * ------------------------------------------------------------------------ */
1940 static void scheduleThread_ (StgTSO* tso);
1943 scheduleThread_(StgTSO *tso)
1945 // Precondition: sched_mutex must be held.
1946 PUSH_ON_RUN_QUEUE(tso);
1951 scheduleThread(StgTSO* tso)
1953 ACQUIRE_LOCK(&sched_mutex);
1954 scheduleThread_(tso);
1955 RELEASE_LOCK(&sched_mutex);
1958 #if defined(RTS_SUPPORTS_THREADS)
1959 static Condition bound_cond_cache;
1960 static int bound_cond_cache_full = 0;
1965 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1966 Capability *initialCapability)
1968 // Precondition: sched_mutex must be held
1971 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1975 #if defined(RTS_SUPPORTS_THREADS)
1976 // Allocating a new condition for each thread is expensive, so we
1977 // cache one. This is a pretty feeble hack, but it helps speed up
1978 // consecutive call-ins quite a bit.
1979 if (bound_cond_cache_full) {
1980 m->bound_thread_cond = bound_cond_cache;
1981 bound_cond_cache_full = 0;
1983 initCondition(&m->bound_thread_cond);
1987 /* Put the thread on the main-threads list prior to scheduling the TSO.
1988 Failure to do so introduces a race condition in the MT case (as
1989 identified by Wolfgang Thaller), whereby the new task/OS thread
1990 created by scheduleThread_() would complete prior to the thread
1991 that spawned it managed to put 'itself' on the main-threads list.
1992 The upshot of it all being that the worker thread wouldn't get to
1993 signal the completion of the its work item for the main thread to
1994 see (==> it got stuck waiting.) -- sof 6/02.
1996 IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
1998 m->link = main_threads;
2001 PUSH_ON_RUN_QUEUE(tso);
2002 // NB. Don't call THREAD_RUNNABLE() here, because the thread is
2003 // bound and only runnable by *this* OS thread, so waking up other
2004 // workers will just slow things down.
2006 return waitThread_(m, initialCapability);
2009 /* ---------------------------------------------------------------------------
2012 * Initialise the scheduler. This resets all the queues - if the
2013 * queues contained any threads, they'll be garbage collected at the
2016 * ------------------------------------------------------------------------ */
2024 for (i=0; i<=MAX_PROC; i++) {
2025 run_queue_hds[i] = END_TSO_QUEUE;
2026 run_queue_tls[i] = END_TSO_QUEUE;
2027 blocked_queue_hds[i] = END_TSO_QUEUE;
2028 blocked_queue_tls[i] = END_TSO_QUEUE;
2029 ccalling_threadss[i] = END_TSO_QUEUE;
2030 sleeping_queue = END_TSO_QUEUE;
2033 run_queue_hd = END_TSO_QUEUE;
2034 run_queue_tl = END_TSO_QUEUE;
2035 blocked_queue_hd = END_TSO_QUEUE;
2036 blocked_queue_tl = END_TSO_QUEUE;
2037 sleeping_queue = END_TSO_QUEUE;
2040 suspended_ccalling_threads = END_TSO_QUEUE;
2042 main_threads = NULL;
2043 all_threads = END_TSO_QUEUE;
2048 RtsFlags.ConcFlags.ctxtSwitchTicks =
2049 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2051 #if defined(RTS_SUPPORTS_THREADS)
2052 /* Initialise the mutex and condition variables used by
2054 initMutex(&sched_mutex);
2055 initMutex(&term_mutex);
2058 ACQUIRE_LOCK(&sched_mutex);
2060 /* A capability holds the state a native thread needs in
2061 * order to execute STG code. At least one capability is
2062 * floating around (only SMP builds have more than one).
2066 #if defined(RTS_SUPPORTS_THREADS)
2067 /* start our haskell execution tasks */
2068 startTaskManager(0,taskStart);
2071 #if /* defined(SMP) ||*/ defined(PAR)
2075 RELEASE_LOCK(&sched_mutex);
2079 exitScheduler( void )
2081 #if defined(RTS_SUPPORTS_THREADS)
2084 shutting_down_scheduler = rtsTrue;
2087 /* ----------------------------------------------------------------------------
2088 Managing the per-task allocation areas.
2090 Each capability comes with an allocation area. These are
2091 fixed-length block lists into which allocation can be done.
2093 ToDo: no support for two-space collection at the moment???
2094 ------------------------------------------------------------------------- */
2098 waitThread_(StgMainThread* m, Capability *initialCapability)
2100 SchedulerStatus stat;
2102 // Precondition: sched_mutex must be held.
2103 IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2106 /* GranSim specific init */
2107 CurrentTSO = m->tso; // the TSO to run
2108 procStatus[MainProc] = Busy; // status of main PE
2109 CurrentProc = MainProc; // PE to run it on
2110 schedule(m,initialCapability);
2112 schedule(m,initialCapability);
2113 ASSERT(m->stat != NoStatus);
2118 #if defined(RTS_SUPPORTS_THREADS)
2119 // Free the condition variable, returning it to the cache if possible.
2120 if (!bound_cond_cache_full) {
2121 bound_cond_cache = m->bound_thread_cond;
2122 bound_cond_cache_full = 1;
2124 closeCondition(&m->bound_thread_cond);
2128 IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2131 // Postcondition: sched_mutex still held
2135 /* ---------------------------------------------------------------------------
2136 Where are the roots that we know about?
2138 - all the threads on the runnable queue
2139 - all the threads on the blocked queue
2140 - all the threads on the sleeping queue
2141 - all the thread currently executing a _ccall_GC
2142 - all the "main threads"
2144 ------------------------------------------------------------------------ */
2146 /* This has to be protected either by the scheduler monitor, or by the
2147 garbage collection monitor (probably the latter).
2152 GetRoots( evac_fn evac )
2157 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2158 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2159 evac((StgClosure **)&run_queue_hds[i]);
2160 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2161 evac((StgClosure **)&run_queue_tls[i]);
2163 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2164 evac((StgClosure **)&blocked_queue_hds[i]);
2165 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2166 evac((StgClosure **)&blocked_queue_tls[i]);
2167 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2168 evac((StgClosure **)&ccalling_threads[i]);
2175 if (run_queue_hd != END_TSO_QUEUE) {
2176 ASSERT(run_queue_tl != END_TSO_QUEUE);
2177 evac((StgClosure **)&run_queue_hd);
2178 evac((StgClosure **)&run_queue_tl);
2181 if (blocked_queue_hd != END_TSO_QUEUE) {
2182 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2183 evac((StgClosure **)&blocked_queue_hd);
2184 evac((StgClosure **)&blocked_queue_tl);
2187 if (sleeping_queue != END_TSO_QUEUE) {
2188 evac((StgClosure **)&sleeping_queue);
2192 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2193 evac((StgClosure **)&suspended_ccalling_threads);
2196 #if defined(PAR) || defined(GRAN)
2197 markSparkQueue(evac);
2200 #if defined(RTS_USER_SIGNALS)
2201 // mark the signal handlers (signals should be already blocked)
2202 markSignalHandlers(evac);
2205 // main threads which have completed need to be retained until they
2206 // are dealt with in the main scheduler loop. They won't be
2207 // retained any other way: the GC will drop them from the
2208 // all_threads list, so we have to be careful to treat them as roots
2212 for (m = main_threads; m != NULL; m = m->link) {
2213 switch (m->tso->what_next) {
2214 case ThreadComplete:
2216 evac((StgClosure **)&m->tso);
2225 /* -----------------------------------------------------------------------------
2228 This is the interface to the garbage collector from Haskell land.
2229 We provide this so that external C code can allocate and garbage
2230 collect when called from Haskell via _ccall_GC.
2232 It might be useful to provide an interface whereby the programmer
2233 can specify more roots (ToDo).
2235 This needs to be protected by the GC condition variable above. KH.
2236 -------------------------------------------------------------------------- */
2238 static void (*extra_roots)(evac_fn);
2243 /* Obligated to hold this lock upon entry */
2244 ACQUIRE_LOCK(&sched_mutex);
2245 GarbageCollect(GetRoots,rtsFalse);
2246 RELEASE_LOCK(&sched_mutex);
2250 performMajorGC(void)
2252 ACQUIRE_LOCK(&sched_mutex);
2253 GarbageCollect(GetRoots,rtsTrue);
2254 RELEASE_LOCK(&sched_mutex);
2258 AllRoots(evac_fn evac)
2260 GetRoots(evac); // the scheduler's roots
2261 extra_roots(evac); // the user's roots
2265 performGCWithRoots(void (*get_roots)(evac_fn))
2267 ACQUIRE_LOCK(&sched_mutex);
2268 extra_roots = get_roots;
2269 GarbageCollect(AllRoots,rtsFalse);
2270 RELEASE_LOCK(&sched_mutex);
2273 /* -----------------------------------------------------------------------------
2276 If the thread has reached its maximum stack size, then raise the
2277 StackOverflow exception in the offending thread. Otherwise
2278 relocate the TSO into a larger chunk of memory and adjust its stack
2280 -------------------------------------------------------------------------- */
2283 threadStackOverflow(StgTSO *tso)
2285 nat new_stack_size, new_tso_size, stack_words;
2289 IF_DEBUG(sanity,checkTSO(tso));
2290 if (tso->stack_size >= tso->max_stack_size) {
2293 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2294 tso->id, tso, tso->stack_size, tso->max_stack_size);
2295 /* If we're debugging, just print out the top of the stack */
2296 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2299 /* Send this thread the StackOverflow exception */
2300 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2304 /* Try to double the current stack size. If that takes us over the
2305 * maximum stack size for this thread, then use the maximum instead.
2306 * Finally round up so the TSO ends up as a whole number of blocks.
2308 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2309 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2310 TSO_STRUCT_SIZE)/sizeof(W_);
2311 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2312 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2314 IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2316 dest = (StgTSO *)allocate(new_tso_size);
2317 TICK_ALLOC_TSO(new_stack_size,0);
2319 /* copy the TSO block and the old stack into the new area */
2320 memcpy(dest,tso,TSO_STRUCT_SIZE);
2321 stack_words = tso->stack + tso->stack_size - tso->sp;
2322 new_sp = (P_)dest + new_tso_size - stack_words;
2323 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2325 /* relocate the stack pointers... */
2327 dest->stack_size = new_stack_size;
2329 /* Mark the old TSO as relocated. We have to check for relocated
2330 * TSOs in the garbage collector and any primops that deal with TSOs.
2332 * It's important to set the sp value to just beyond the end
2333 * of the stack, so we don't attempt to scavenge any part of the
2336 tso->what_next = ThreadRelocated;
2338 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2339 tso->why_blocked = NotBlocked;
2340 dest->mut_link = NULL;
2342 IF_PAR_DEBUG(verbose,
2343 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2344 tso->id, tso, tso->stack_size);
2345 /* If we're debugging, just print out the top of the stack */
2346 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2349 IF_DEBUG(sanity,checkTSO(tso));
2351 IF_DEBUG(scheduler,printTSO(dest));
2357 /* ---------------------------------------------------------------------------
2358 Wake up a queue that was blocked on some resource.
2359 ------------------------------------------------------------------------ */
2363 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2368 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2370 /* write RESUME events to log file and
2371 update blocked and fetch time (depending on type of the orig closure) */
2372 if (RtsFlags.ParFlags.ParStats.Full) {
2373 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2374 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2375 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2376 if (EMPTY_RUN_QUEUE())
2377 emitSchedule = rtsTrue;
2379 switch (get_itbl(node)->type) {
2381 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2386 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2393 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2400 static StgBlockingQueueElement *
2401 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2404 PEs node_loc, tso_loc;
2406 node_loc = where_is(node); // should be lifted out of loop
2407 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2408 tso_loc = where_is((StgClosure *)tso);
2409 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2410 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2411 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2412 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2413 // insertThread(tso, node_loc);
2414 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2416 tso, node, (rtsSpark*)NULL);
2417 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2420 } else { // TSO is remote (actually should be FMBQ)
2421 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2422 RtsFlags.GranFlags.Costs.gunblocktime +
2423 RtsFlags.GranFlags.Costs.latency;
2424 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2426 tso, node, (rtsSpark*)NULL);
2427 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2430 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2432 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2433 (node_loc==tso_loc ? "Local" : "Global"),
2434 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2435 tso->block_info.closure = NULL;
2436 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2440 static StgBlockingQueueElement *
2441 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2443 StgBlockingQueueElement *next;
2445 switch (get_itbl(bqe)->type) {
2447 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2448 /* if it's a TSO just push it onto the run_queue */
2450 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2451 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2453 unblockCount(bqe, node);
2454 /* reset blocking status after dumping event */
2455 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2459 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2461 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2462 PendingFetches = (StgBlockedFetch *)bqe;
2466 /* can ignore this case in a non-debugging setup;
2467 see comments on RBHSave closures above */
2469 /* check that the closure is an RBHSave closure */
2470 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2471 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2472 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2476 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2477 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2481 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2485 #else /* !GRAN && !PAR */
2487 unblockOneLocked(StgTSO *tso)
2491 ASSERT(get_itbl(tso)->type == TSO);
2492 ASSERT(tso->why_blocked != NotBlocked);
2493 tso->why_blocked = NotBlocked;
2495 PUSH_ON_RUN_QUEUE(tso);
2497 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2502 #if defined(GRAN) || defined(PAR)
2503 INLINE_ME StgBlockingQueueElement *
2504 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2506 ACQUIRE_LOCK(&sched_mutex);
2507 bqe = unblockOneLocked(bqe, node);
2508 RELEASE_LOCK(&sched_mutex);
2513 unblockOne(StgTSO *tso)
2515 ACQUIRE_LOCK(&sched_mutex);
2516 tso = unblockOneLocked(tso);
2517 RELEASE_LOCK(&sched_mutex);
2524 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2526 StgBlockingQueueElement *bqe;
2531 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2532 node, CurrentProc, CurrentTime[CurrentProc],
2533 CurrentTSO->id, CurrentTSO));
2535 node_loc = where_is(node);
2537 ASSERT(q == END_BQ_QUEUE ||
2538 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2539 get_itbl(q)->type == CONSTR); // closure (type constructor)
2540 ASSERT(is_unique(node));
2542 /* FAKE FETCH: magically copy the node to the tso's proc;
2543 no Fetch necessary because in reality the node should not have been
2544 moved to the other PE in the first place
2546 if (CurrentProc!=node_loc) {
2548 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2549 node, node_loc, CurrentProc, CurrentTSO->id,
2550 // CurrentTSO, where_is(CurrentTSO),
2551 node->header.gran.procs));
2552 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2554 belch("## new bitmask of node %p is %#x",
2555 node, node->header.gran.procs));
2556 if (RtsFlags.GranFlags.GranSimStats.Global) {
2557 globalGranStats.tot_fake_fetches++;
2562 // ToDo: check: ASSERT(CurrentProc==node_loc);
2563 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2566 bqe points to the current element in the queue
2567 next points to the next element in the queue
2569 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2570 //tso_loc = where_is(tso);
2572 bqe = unblockOneLocked(bqe, node);
2575 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2576 the closure to make room for the anchor of the BQ */
2577 if (bqe!=END_BQ_QUEUE) {
2578 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2580 ASSERT((info_ptr==&RBH_Save_0_info) ||
2581 (info_ptr==&RBH_Save_1_info) ||
2582 (info_ptr==&RBH_Save_2_info));
2584 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2585 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2586 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2589 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2590 node, info_type(node)));
2593 /* statistics gathering */
2594 if (RtsFlags.GranFlags.GranSimStats.Global) {
2595 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2596 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2597 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2598 globalGranStats.tot_awbq++; // total no. of bqs awakened
2601 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2602 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2606 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2608 StgBlockingQueueElement *bqe;
2610 ACQUIRE_LOCK(&sched_mutex);
2612 IF_PAR_DEBUG(verbose,
2613 belch("##-_ AwBQ for node %p on [%x]: ",
2617 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2618 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2623 ASSERT(q == END_BQ_QUEUE ||
2624 get_itbl(q)->type == TSO ||
2625 get_itbl(q)->type == BLOCKED_FETCH ||
2626 get_itbl(q)->type == CONSTR);
2629 while (get_itbl(bqe)->type==TSO ||
2630 get_itbl(bqe)->type==BLOCKED_FETCH) {
2631 bqe = unblockOneLocked(bqe, node);
2633 RELEASE_LOCK(&sched_mutex);
2636 #else /* !GRAN && !PAR */
2638 #ifdef RTS_SUPPORTS_THREADS
2640 awakenBlockedQueueNoLock(StgTSO *tso)
2642 while (tso != END_TSO_QUEUE) {
2643 tso = unblockOneLocked(tso);
2649 awakenBlockedQueue(StgTSO *tso)
2651 ACQUIRE_LOCK(&sched_mutex);
2652 while (tso != END_TSO_QUEUE) {
2653 tso = unblockOneLocked(tso);
2655 RELEASE_LOCK(&sched_mutex);
2659 /* ---------------------------------------------------------------------------
2661 - usually called inside a signal handler so it mustn't do anything fancy.
2662 ------------------------------------------------------------------------ */
2665 interruptStgRts(void)
2669 #ifdef RTS_SUPPORTS_THREADS
2670 wakeBlockedWorkerThread();
2674 /* -----------------------------------------------------------------------------
2677 This is for use when we raise an exception in another thread, which
2679 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2680 -------------------------------------------------------------------------- */
2682 #if defined(GRAN) || defined(PAR)
2684 NB: only the type of the blocking queue is different in GranSim and GUM
2685 the operations on the queue-elements are the same
2686 long live polymorphism!
2688 Locks: sched_mutex is held upon entry and exit.
2692 unblockThread(StgTSO *tso)
2694 StgBlockingQueueElement *t, **last;
2696 switch (tso->why_blocked) {
2699 return; /* not blocked */
2702 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2704 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2705 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2707 last = (StgBlockingQueueElement **)&mvar->head;
2708 for (t = (StgBlockingQueueElement *)mvar->head;
2710 last = &t->link, last_tso = t, t = t->link) {
2711 if (t == (StgBlockingQueueElement *)tso) {
2712 *last = (StgBlockingQueueElement *)tso->link;
2713 if (mvar->tail == tso) {
2714 mvar->tail = (StgTSO *)last_tso;
2719 barf("unblockThread (MVAR): TSO not found");
2722 case BlockedOnBlackHole:
2723 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2725 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2727 last = &bq->blocking_queue;
2728 for (t = bq->blocking_queue;
2730 last = &t->link, t = t->link) {
2731 if (t == (StgBlockingQueueElement *)tso) {
2732 *last = (StgBlockingQueueElement *)tso->link;
2736 barf("unblockThread (BLACKHOLE): TSO not found");
2739 case BlockedOnException:
2741 StgTSO *target = tso->block_info.tso;
2743 ASSERT(get_itbl(target)->type == TSO);
2745 if (target->what_next == ThreadRelocated) {
2746 target = target->link;
2747 ASSERT(get_itbl(target)->type == TSO);
2750 ASSERT(target->blocked_exceptions != NULL);
2752 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2753 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2755 last = &t->link, t = t->link) {
2756 ASSERT(get_itbl(t)->type == TSO);
2757 if (t == (StgBlockingQueueElement *)tso) {
2758 *last = (StgBlockingQueueElement *)tso->link;
2762 barf("unblockThread (Exception): TSO not found");
2766 case BlockedOnWrite:
2767 #if defined(mingw32_TARGET_OS)
2768 case BlockedOnDoProc:
2771 /* take TSO off blocked_queue */
2772 StgBlockingQueueElement *prev = NULL;
2773 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2774 prev = t, t = t->link) {
2775 if (t == (StgBlockingQueueElement *)tso) {
2777 blocked_queue_hd = (StgTSO *)t->link;
2778 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2779 blocked_queue_tl = END_TSO_QUEUE;
2782 prev->link = t->link;
2783 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2784 blocked_queue_tl = (StgTSO *)prev;
2790 barf("unblockThread (I/O): TSO not found");
2793 case BlockedOnDelay:
2795 /* take TSO off sleeping_queue */
2796 StgBlockingQueueElement *prev = NULL;
2797 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2798 prev = t, t = t->link) {
2799 if (t == (StgBlockingQueueElement *)tso) {
2801 sleeping_queue = (StgTSO *)t->link;
2803 prev->link = t->link;
2808 barf("unblockThread (delay): TSO not found");
2812 barf("unblockThread");
2816 tso->link = END_TSO_QUEUE;
2817 tso->why_blocked = NotBlocked;
2818 tso->block_info.closure = NULL;
2819 PUSH_ON_RUN_QUEUE(tso);
2823 unblockThread(StgTSO *tso)
2827 /* To avoid locking unnecessarily. */
2828 if (tso->why_blocked == NotBlocked) {
2832 switch (tso->why_blocked) {
2835 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2837 StgTSO *last_tso = END_TSO_QUEUE;
2838 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2841 for (t = mvar->head; t != END_TSO_QUEUE;
2842 last = &t->link, last_tso = t, t = t->link) {
2845 if (mvar->tail == tso) {
2846 mvar->tail = last_tso;
2851 barf("unblockThread (MVAR): TSO not found");
2854 case BlockedOnBlackHole:
2855 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2857 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2859 last = &bq->blocking_queue;
2860 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2861 last = &t->link, t = t->link) {
2867 barf("unblockThread (BLACKHOLE): TSO not found");
2870 case BlockedOnException:
2872 StgTSO *target = tso->block_info.tso;
2874 ASSERT(get_itbl(target)->type == TSO);
2876 while (target->what_next == ThreadRelocated) {
2877 target = target->link;
2878 ASSERT(get_itbl(target)->type == TSO);
2881 ASSERT(target->blocked_exceptions != NULL);
2883 last = &target->blocked_exceptions;
2884 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2885 last = &t->link, t = t->link) {
2886 ASSERT(get_itbl(t)->type == TSO);
2892 barf("unblockThread (Exception): TSO not found");
2896 case BlockedOnWrite:
2897 #if defined(mingw32_TARGET_OS)
2898 case BlockedOnDoProc:
2901 StgTSO *prev = NULL;
2902 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2903 prev = t, t = t->link) {
2906 blocked_queue_hd = t->link;
2907 if (blocked_queue_tl == t) {
2908 blocked_queue_tl = END_TSO_QUEUE;
2911 prev->link = t->link;
2912 if (blocked_queue_tl == t) {
2913 blocked_queue_tl = prev;
2919 barf("unblockThread (I/O): TSO not found");
2922 case BlockedOnDelay:
2924 StgTSO *prev = NULL;
2925 for (t = sleeping_queue; t != END_TSO_QUEUE;
2926 prev = t, t = t->link) {
2929 sleeping_queue = t->link;
2931 prev->link = t->link;
2936 barf("unblockThread (delay): TSO not found");
2940 barf("unblockThread");
2944 tso->link = END_TSO_QUEUE;
2945 tso->why_blocked = NotBlocked;
2946 tso->block_info.closure = NULL;
2947 PUSH_ON_RUN_QUEUE(tso);
2951 /* -----------------------------------------------------------------------------
2954 * The following function implements the magic for raising an
2955 * asynchronous exception in an existing thread.
2957 * We first remove the thread from any queue on which it might be
2958 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2960 * We strip the stack down to the innermost CATCH_FRAME, building
2961 * thunks in the heap for all the active computations, so they can
2962 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2963 * an application of the handler to the exception, and push it on
2964 * the top of the stack.
2966 * How exactly do we save all the active computations? We create an
2967 * AP_STACK for every UpdateFrame on the stack. Entering one of these
2968 * AP_STACKs pushes everything from the corresponding update frame
2969 * upwards onto the stack. (Actually, it pushes everything up to the
2970 * next update frame plus a pointer to the next AP_STACK object.
2971 * Entering the next AP_STACK object pushes more onto the stack until we
2972 * reach the last AP_STACK object - at which point the stack should look
2973 * exactly as it did when we killed the TSO and we can continue
2974 * execution by entering the closure on top of the stack.
2976 * We can also kill a thread entirely - this happens if either (a) the
2977 * exception passed to raiseAsync is NULL, or (b) there's no
2978 * CATCH_FRAME on the stack. In either case, we strip the entire
2979 * stack and replace the thread with a zombie.
2981 * Locks: sched_mutex held upon entry nor exit.
2983 * -------------------------------------------------------------------------- */
2986 deleteThread(StgTSO *tso)
2988 raiseAsync(tso,NULL);
2992 deleteThreadImmediately(StgTSO *tso)
2993 { // for forkProcess only:
2994 // delete thread without giving it a chance to catch the KillThread exception
2996 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2999 #if defined(RTS_SUPPORTS_THREADS)
3000 if (tso->why_blocked != BlockedOnCCall
3001 && tso->why_blocked != BlockedOnCCall_NoUnblockExc)
3004 tso->what_next = ThreadKilled;
3008 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3010 /* When raising async exs from contexts where sched_mutex isn't held;
3011 use raiseAsyncWithLock(). */
3012 ACQUIRE_LOCK(&sched_mutex);
3013 raiseAsync(tso,exception);
3014 RELEASE_LOCK(&sched_mutex);
3018 raiseAsync(StgTSO *tso, StgClosure *exception)
3020 StgRetInfoTable *info;
3023 // Thread already dead?
3024 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3029 sched_belch("raising exception in thread %ld.", tso->id));
3031 // Remove it from any blocking queues
3036 // The stack freezing code assumes there's a closure pointer on
3037 // the top of the stack, so we have to arrange that this is the case...
3039 if (sp[0] == (W_)&stg_enter_info) {
3043 sp[0] = (W_)&stg_dummy_ret_closure;
3049 // 1. Let the top of the stack be the "current closure"
3051 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3054 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3055 // current closure applied to the chunk of stack up to (but not
3056 // including) the update frame. This closure becomes the "current
3057 // closure". Go back to step 2.
3059 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3060 // top of the stack applied to the exception.
3062 // 5. If it's a STOP_FRAME, then kill the thread.
3067 info = get_ret_itbl((StgClosure *)frame);
3069 while (info->i.type != UPDATE_FRAME
3070 && (info->i.type != CATCH_FRAME || exception == NULL)
3071 && info->i.type != STOP_FRAME) {
3072 frame += stack_frame_sizeW((StgClosure *)frame);
3073 info = get_ret_itbl((StgClosure *)frame);
3076 switch (info->i.type) {
3079 // If we find a CATCH_FRAME, and we've got an exception to raise,
3080 // then build the THUNK raise(exception), and leave it on
3081 // top of the CATCH_FRAME ready to enter.
3085 StgCatchFrame *cf = (StgCatchFrame *)frame;
3089 // we've got an exception to raise, so let's pass it to the
3090 // handler in this frame.
3092 raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3093 TICK_ALLOC_SE_THK(1,0);
3094 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3095 raise->payload[0] = exception;
3097 // throw away the stack from Sp up to the CATCH_FRAME.
3101 /* Ensure that async excpetions are blocked now, so we don't get
3102 * a surprise exception before we get around to executing the
3105 if (tso->blocked_exceptions == NULL) {
3106 tso->blocked_exceptions = END_TSO_QUEUE;
3109 /* Put the newly-built THUNK on top of the stack, ready to execute
3110 * when the thread restarts.
3113 sp[-1] = (W_)&stg_enter_info;
3115 tso->what_next = ThreadRunGHC;
3116 IF_DEBUG(sanity, checkTSO(tso));
3125 // First build an AP_STACK consisting of the stack chunk above the
3126 // current update frame, with the top word on the stack as the
3129 words = frame - sp - 1;
3130 ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3133 ap->fun = (StgClosure *)sp[0];
3135 for(i=0; i < (nat)words; ++i) {
3136 ap->payload[i] = (StgClosure *)*sp++;
3139 SET_HDR(ap,&stg_AP_STACK_info,
3140 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3141 TICK_ALLOC_UP_THK(words+1,0);
3144 fprintf(stderr, "sched: Updating ");
3145 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3146 fprintf(stderr, " with ");
3147 printObj((StgClosure *)ap);
3150 // Replace the updatee with an indirection - happily
3151 // this will also wake up any threads currently
3152 // waiting on the result.
3154 // Warning: if we're in a loop, more than one update frame on
3155 // the stack may point to the same object. Be careful not to
3156 // overwrite an IND_OLDGEN in this case, because we'll screw
3157 // up the mutable lists. To be on the safe side, don't
3158 // overwrite any kind of indirection at all. See also
3159 // threadSqueezeStack in GC.c, where we have to make a similar
3162 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3163 // revert the black hole
3164 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3166 sp += sizeofW(StgUpdateFrame) - 1;
3167 sp[0] = (W_)ap; // push onto stack
3172 // We've stripped the entire stack, the thread is now dead.
3173 sp += sizeofW(StgStopFrame);
3174 tso->what_next = ThreadKilled;
3185 /* -----------------------------------------------------------------------------
3186 resurrectThreads is called after garbage collection on the list of
3187 threads found to be garbage. Each of these threads will be woken
3188 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3189 on an MVar, or NonTermination if the thread was blocked on a Black
3192 Locks: sched_mutex isn't held upon entry nor exit.
3193 -------------------------------------------------------------------------- */
3196 resurrectThreads( StgTSO *threads )
3200 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3201 next = tso->global_link;
3202 tso->global_link = all_threads;
3204 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3206 switch (tso->why_blocked) {
3208 case BlockedOnException:
3209 /* Called by GC - sched_mutex lock is currently held. */
3210 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3212 case BlockedOnBlackHole:
3213 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3216 /* This might happen if the thread was blocked on a black hole
3217 * belonging to a thread that we've just woken up (raiseAsync
3218 * can wake up threads, remember...).
3222 barf("resurrectThreads: thread blocked in a strange way");
3227 /* -----------------------------------------------------------------------------
3228 * Blackhole detection: if we reach a deadlock, test whether any
3229 * threads are blocked on themselves. Any threads which are found to
3230 * be self-blocked get sent a NonTermination exception.
3232 * This is only done in a deadlock situation in order to avoid
3233 * performance overhead in the normal case.
3235 * Locks: sched_mutex is held upon entry and exit.
3236 * -------------------------------------------------------------------------- */
3239 detectBlackHoles( void )
3241 StgTSO *tso = all_threads;
3243 StgClosure *blocked_on;
3244 StgRetInfoTable *info;
3246 for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3248 while (tso->what_next == ThreadRelocated) {
3250 ASSERT(get_itbl(tso)->type == TSO);
3253 if (tso->why_blocked != BlockedOnBlackHole) {
3256 blocked_on = tso->block_info.closure;
3258 frame = (StgClosure *)tso->sp;
3261 info = get_ret_itbl(frame);
3262 switch (info->i.type) {
3264 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3265 /* We are blocking on one of our own computations, so
3266 * send this thread the NonTermination exception.
3269 sched_belch("thread %d is blocked on itself", tso->id));
3270 raiseAsync(tso, (StgClosure *)NonTermination_closure);
3274 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3280 // normal stack frames; do nothing except advance the pointer
3282 (StgPtr)frame += stack_frame_sizeW(frame);
3289 /* ----------------------------------------------------------------------------
3290 * Debugging: why is a thread blocked
3291 * [Also provides useful information when debugging threaded programs
3292 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3293 ------------------------------------------------------------------------- */
3297 printThreadBlockage(StgTSO *tso)
3299 switch (tso->why_blocked) {
3301 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3303 case BlockedOnWrite:
3304 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3306 #if defined(mingw32_TARGET_OS)
3307 case BlockedOnDoProc:
3308 fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3311 case BlockedOnDelay:
3312 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3315 fprintf(stderr,"is blocked on an MVar");
3317 case BlockedOnException:
3318 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3319 tso->block_info.tso->id);
3321 case BlockedOnBlackHole:
3322 fprintf(stderr,"is blocked on a black hole");
3325 fprintf(stderr,"is not blocked");
3329 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3330 tso->block_info.closure, info_type(tso->block_info.closure));
3332 case BlockedOnGA_NoSend:
3333 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3334 tso->block_info.closure, info_type(tso->block_info.closure));
3337 #if defined(RTS_SUPPORTS_THREADS)
3338 case BlockedOnCCall:
3339 fprintf(stderr,"is blocked on an external call");
3341 case BlockedOnCCall_NoUnblockExc:
3342 fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3346 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3347 tso->why_blocked, tso->id, tso);
3353 printThreadStatus(StgTSO *tso)
3355 switch (tso->what_next) {
3357 fprintf(stderr,"has been killed");
3359 case ThreadComplete:
3360 fprintf(stderr,"has completed");
3363 printThreadBlockage(tso);
3368 printAllThreads(void)
3374 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3375 ullong_format_string(TIME_ON_PROC(CurrentProc),
3376 time_string, rtsFalse/*no commas!*/);
3378 fprintf(stderr, "all threads at [%s]:\n", time_string);
3380 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3381 ullong_format_string(CURRENT_TIME,
3382 time_string, rtsFalse/*no commas!*/);
3384 fprintf(stderr,"all threads at [%s]:\n", time_string);
3386 fprintf(stderr,"all threads:\n");
3389 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3390 fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3391 label = lookupThreadLabel(t->id);
3392 if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3393 printThreadStatus(t);
3394 fprintf(stderr,"\n");
3401 Print a whole blocking queue attached to node (debugging only).
3405 print_bq (StgClosure *node)
3407 StgBlockingQueueElement *bqe;
3411 fprintf(stderr,"## BQ of closure %p (%s): ",
3412 node, info_type(node));
3414 /* should cover all closures that may have a blocking queue */
3415 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3416 get_itbl(node)->type == FETCH_ME_BQ ||
3417 get_itbl(node)->type == RBH ||
3418 get_itbl(node)->type == MVAR);
3420 ASSERT(node!=(StgClosure*)NULL); // sanity check
3422 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3426 Print a whole blocking queue starting with the element bqe.
3429 print_bqe (StgBlockingQueueElement *bqe)
3434 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3436 for (end = (bqe==END_BQ_QUEUE);
3437 !end; // iterate until bqe points to a CONSTR
3438 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3439 bqe = end ? END_BQ_QUEUE : bqe->link) {
3440 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3441 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3442 /* types of closures that may appear in a blocking queue */
3443 ASSERT(get_itbl(bqe)->type == TSO ||
3444 get_itbl(bqe)->type == BLOCKED_FETCH ||
3445 get_itbl(bqe)->type == CONSTR);
3446 /* only BQs of an RBH end with an RBH_Save closure */
3447 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3449 switch (get_itbl(bqe)->type) {
3451 fprintf(stderr," TSO %u (%x),",
3452 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3455 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3456 ((StgBlockedFetch *)bqe)->node,
3457 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3458 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3459 ((StgBlockedFetch *)bqe)->ga.weight);
3462 fprintf(stderr," %s (IP %p),",
3463 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3464 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3465 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3466 "RBH_Save_?"), get_itbl(bqe));
3469 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3470 info_type((StgClosure *)bqe)); // , node, info_type(node));
3474 fputc('\n', stderr);
3476 # elif defined(GRAN)
3478 print_bq (StgClosure *node)
3480 StgBlockingQueueElement *bqe;
3481 PEs node_loc, tso_loc;
3484 /* should cover all closures that may have a blocking queue */
3485 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3486 get_itbl(node)->type == FETCH_ME_BQ ||
3487 get_itbl(node)->type == RBH);
3489 ASSERT(node!=(StgClosure*)NULL); // sanity check
3490 node_loc = where_is(node);
3492 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3493 node, info_type(node), node_loc);
3496 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3498 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3499 !end; // iterate until bqe points to a CONSTR
3500 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3501 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3502 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3503 /* types of closures that may appear in a blocking queue */
3504 ASSERT(get_itbl(bqe)->type == TSO ||
3505 get_itbl(bqe)->type == CONSTR);
3506 /* only BQs of an RBH end with an RBH_Save closure */
3507 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3509 tso_loc = where_is((StgClosure *)bqe);
3510 switch (get_itbl(bqe)->type) {
3512 fprintf(stderr," TSO %d (%p) on [PE %d],",
3513 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3516 fprintf(stderr," %s (IP %p),",
3517 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3518 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3519 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3520 "RBH_Save_?"), get_itbl(bqe));
3523 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3524 info_type((StgClosure *)bqe), node, info_type(node));
3528 fputc('\n', stderr);
3532 Nice and easy: only TSOs on the blocking queue
3535 print_bq (StgClosure *node)
3539 ASSERT(node!=(StgClosure*)NULL); // sanity check
3540 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3541 tso != END_TSO_QUEUE;
3543 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3544 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3545 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3547 fputc('\n', stderr);
3558 for (i=0, tso=run_queue_hd;
3559 tso != END_TSO_QUEUE;
3568 sched_belch(char *s, ...)
3572 #ifdef RTS_SUPPORTS_THREADS
3573 fprintf(stderr, "sched (task %p): ", osThreadId());
3575 fprintf(stderr, "== ");
3577 fprintf(stderr, "sched: ");
3579 vfprintf(stderr, s, ap);
3580 fprintf(stderr, "\n");