1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.104 2001/10/31 10:34:29 simonmar Exp $
4 * (c) The GHC Team, 1998-2000
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distributed memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
17 * --------------------------------------------------------------------------*/
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
23 * Version with scheduler monitor support for SMPs (WAY=s):
25 This design provides a high-level API to create and schedule threads etc.
26 as documented in the SMP design document.
28 It uses a monitor design controlled by a single mutex to exercise control
29 over accesses to shared data structures, and builds on the Posix threads
32 The majority of state is shared. In order to keep essential per-task state,
33 there is a Capability structure, which contains all the information
34 needed to run a thread: its STG registers, a pointer to its TSO, a
35 nursery etc. During STG execution, a pointer to the capability is
36 kept in a register (BaseReg).
38 In a non-SMP build, there is one global capability, namely MainRegTable.
42 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44 The main scheduling loop in GUM iterates until a finish message is received.
45 In that case a global flag @receivedFinish@ is set and this instance of
46 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47 for the handling of incoming messages, such as PP_FINISH.
48 Note that in the parallel case we have a system manager that coordinates
49 different PEs, each of which are running one instance of the RTS.
50 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55 The main scheduling code in GranSim is quite different from that in std
56 (concurrent) Haskell: while concurrent Haskell just iterates over the
57 threads in the runnable queue, GranSim is event driven, i.e. it iterates
58 over the events in the global event queue. -- HWL
63 //* Variables and Data structures::
64 //* Main scheduling loop::
65 //* Suspend and Resume::
67 //* Garbage Collextion Routines::
68 //* Blocking Queue Routines::
69 //* Exception Handling Routines::
70 //* Debugging Routines::
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
77 #include "PosixSource.h"
84 #include "StgStartup.h"
87 #include "StgMiscClosures.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
98 #if defined(GRAN) || defined(PAR)
99 # include "GranSimRts.h"
100 # include "GranSim.h"
101 # include "ParallelRts.h"
102 # include "Parallel.h"
103 # include "ParallelDebug.h"
104 # include "FetchMe.h"
111 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
112 //@subsection Variables and Data structures
116 * These are the threads which clients have requested that we run.
118 * In an SMP build, we might have several concurrent clients all
119 * waiting for results, and each one will wait on a condition variable
120 * until the result is available.
122 * In non-SMP, clients are strictly nested: the first client calls
123 * into the RTS, which might call out again to C with a _ccall_GC, and
124 * eventually re-enter the RTS.
126 * Main threads information is kept in a linked list:
128 //@cindex StgMainThread
129 typedef struct StgMainThread_ {
131 SchedulerStatus stat;
134 pthread_cond_t wakeup;
136 struct StgMainThread_ *link;
139 /* Main thread queue.
140 * Locks required: sched_mutex.
142 static StgMainThread *main_threads;
145 * Locks required: sched_mutex.
149 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
150 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
153 In GranSim we have a runable and a blocked queue for each processor.
154 In order to minimise code changes new arrays run_queue_hds/tls
155 are created. run_queue_hd is then a short cut (macro) for
156 run_queue_hds[CurrentProc] (see GranSim.h).
159 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
160 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
161 StgTSO *ccalling_threadss[MAX_PROC];
162 /* We use the same global list of threads (all_threads) in GranSim as in
163 the std RTS (i.e. we are cheating). However, we don't use this list in
164 the GranSim specific code at the moment (so we are only potentially
169 StgTSO *run_queue_hd, *run_queue_tl;
170 StgTSO *blocked_queue_hd, *blocked_queue_tl;
171 StgTSO *sleeping_queue; /* perhaps replace with a hash table? */
175 /* Linked list of all threads.
176 * Used for detecting garbage collected threads.
180 /* Threads suspended in _ccall_GC.
182 static StgTSO *suspended_ccalling_threads;
184 static void GetRoots(evac_fn);
185 static StgTSO *threadStackOverflow(StgTSO *tso);
187 /* KH: The following two flags are shared memory locations. There is no need
188 to lock them, since they are only unset at the end of a scheduler
192 /* flag set by signal handler to precipitate a context switch */
193 //@cindex context_switch
196 /* if this flag is set as well, give up execution */
197 //@cindex interrupted
200 /* Next thread ID to allocate.
201 * Locks required: sched_mutex
203 //@cindex next_thread_id
204 StgThreadID next_thread_id = 1;
207 * Pointers to the state of the current thread.
208 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
209 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
212 /* The smallest stack size that makes any sense is:
213 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
214 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
215 * + 1 (the realworld token for an IO thread)
216 * + 1 (the closure to enter)
218 * A thread with this stack will bomb immediately with a stack
219 * overflow, which will increase its stack size.
222 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
224 /* Free capability list.
225 * Locks required: sched_mutex.
228 //@cindex free_capabilities
229 //@cindex n_free_capabilities
230 Capability *free_capabilities; /* Available capabilities for running threads */
231 nat n_free_capabilities; /* total number of available capabilities */
233 //@cindex MainRegTable
234 Capability MainRegTable; /* for non-SMP, we have one global capability */
241 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
242 * exists - earlier gccs apparently didn't.
249 /* All our current task ids, saved in case we need to kill them later.
256 void addToBlockedQueue ( StgTSO *tso );
258 static void schedule ( void );
259 void interruptStgRts ( void );
261 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
263 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
266 static void detectBlackHoles ( void );
269 static void sched_belch(char *s, ...);
273 //@cindex sched_mutex
275 //@cindex thread_ready_cond
276 //@cindex gc_pending_cond
277 pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
278 pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
279 pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
280 pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
287 rtsTime TimeOfLastYield;
288 rtsBool emitSchedule = rtsTrue;
292 char *whatNext_strs[] = {
300 char *threadReturnCode_strs[] = {
301 "HeapOverflow", /* might also be StackOverflow */
310 StgTSO * createSparkThread(rtsSpark spark);
311 StgTSO * activateSpark (rtsSpark spark);
315 * The thread state for the main thread.
316 // ToDo: check whether not needed any more
320 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
321 //@subsection Main scheduling loop
323 /* ---------------------------------------------------------------------------
324 Main scheduling loop.
326 We use round-robin scheduling, each thread returning to the
327 scheduler loop when one of these conditions is detected:
330 * timer expires (thread yields)
335 Locking notes: we acquire the scheduler lock once at the beginning
336 of the scheduler loop, and release it when
338 * running a thread, or
339 * waiting for work, or
340 * waiting for a GC to complete.
343 In a GranSim setup this loop iterates over the global event queue.
344 This revolves around the global event queue, which determines what
345 to do next. Therefore, it's more complicated than either the
346 concurrent or the parallel (GUM) setup.
349 GUM iterates over incoming messages.
350 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
351 and sends out a fish whenever it has nothing to do; in-between
352 doing the actual reductions (shared code below) it processes the
353 incoming messages and deals with delayed operations
354 (see PendingFetches).
355 This is not the ugliest code you could imagine, but it's bloody close.
357 ------------------------------------------------------------------------ */
364 StgThreadReturnCode ret;
372 rtsBool receivedFinish = rtsFalse;
374 nat tp_size, sp_size; // stats only
377 rtsBool was_interrupted = rtsFalse;
379 ACQUIRE_LOCK(&sched_mutex);
383 /* set up first event to get things going */
384 /* ToDo: assign costs for system setup and init MainTSO ! */
385 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
387 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
390 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
391 G_TSO(CurrentTSO, 5));
393 if (RtsFlags.GranFlags.Light) {
394 /* Save current time; GranSim Light only */
395 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
398 event = get_next_event();
400 while (event!=(rtsEvent*)NULL) {
401 /* Choose the processor with the next event */
402 CurrentProc = event->proc;
403 CurrentTSO = event->tso;
407 while (!receivedFinish) { /* set by processMessages */
408 /* when receiving PP_FINISH message */
415 IF_DEBUG(scheduler, printAllThreads());
417 /* If we're interrupted (the user pressed ^C, or some other
418 * termination condition occurred), kill all the currently running
422 IF_DEBUG(scheduler, sched_belch("interrupted"));
424 interrupted = rtsFalse;
425 was_interrupted = rtsTrue;
428 /* Go through the list of main threads and wake up any
429 * clients whose computations have finished. ToDo: this
430 * should be done more efficiently without a linear scan
431 * of the main threads list, somehow...
435 StgMainThread *m, **prev;
436 prev = &main_threads;
437 for (m = main_threads; m != NULL; m = m->link) {
438 switch (m->tso->what_next) {
441 *(m->ret) = (StgClosure *)m->tso->sp[0];
445 pthread_cond_broadcast(&m->wakeup);
448 if (m->ret) *(m->ret) = NULL;
450 if (was_interrupted) {
451 m->stat = Interrupted;
455 pthread_cond_broadcast(&m->wakeup);
465 /* in GUM do this only on the Main PE */
468 /* If our main thread has finished or been killed, return.
471 StgMainThread *m = main_threads;
472 if (m->tso->what_next == ThreadComplete
473 || m->tso->what_next == ThreadKilled) {
474 main_threads = main_threads->link;
475 if (m->tso->what_next == ThreadComplete) {
476 /* we finished successfully, fill in the return value */
477 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
481 if (m->ret) { *(m->ret) = NULL; };
482 if (was_interrupted) {
483 m->stat = Interrupted;
493 /* Top up the run queue from our spark pool. We try to make the
494 * number of threads in the run queue equal to the number of
499 nat n = n_free_capabilities;
500 StgTSO *tso = run_queue_hd;
502 /* Count the run queue */
503 while (n > 0 && tso != END_TSO_QUEUE) {
510 spark = findSpark(rtsFalse);
512 break; /* no more sparks in the pool */
514 /* I'd prefer this to be done in activateSpark -- HWL */
515 /* tricky - it needs to hold the scheduler lock and
516 * not try to re-acquire it -- SDM */
517 createSparkThread(spark);
519 sched_belch("==^^ turning spark of closure %p into a thread",
520 (StgClosure *)spark));
523 /* We need to wake up the other tasks if we just created some
526 if (n_free_capabilities - n > 1) {
527 pthread_cond_signal(&thread_ready_cond);
532 /* check for signals each time around the scheduler */
533 #ifndef mingw32_TARGET_OS
534 if (signals_pending()) {
535 startSignalHandlers();
539 /* Check whether any waiting threads need to be woken up. If the
540 * run queue is empty, and there are no other tasks running, we
541 * can wait indefinitely for something to happen.
542 * ToDo: what if another client comes along & requests another
545 if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
547 (run_queue_hd == END_TSO_QUEUE)
549 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
553 /* we can be interrupted while waiting for I/O... */
554 if (interrupted) continue;
557 * Detect deadlock: when we have no threads to run, there are no
558 * threads waiting on I/O or sleeping, and all the other tasks are
559 * waiting for work, we must have a deadlock of some description.
561 * We first try to find threads blocked on themselves (ie. black
562 * holes), and generate NonTermination exceptions where necessary.
564 * If no threads are black holed, we have a deadlock situation, so
565 * inform all the main threads.
568 if (blocked_queue_hd == END_TSO_QUEUE
569 && run_queue_hd == END_TSO_QUEUE
570 && sleeping_queue == END_TSO_QUEUE
572 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
576 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
577 GarbageCollect(GetRoots,rtsTrue);
578 if (blocked_queue_hd == END_TSO_QUEUE
579 && run_queue_hd == END_TSO_QUEUE
580 && sleeping_queue == END_TSO_QUEUE) {
581 IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
583 if (run_queue_hd == END_TSO_QUEUE) {
584 StgMainThread *m = main_threads;
586 for (; m != NULL; m = m->link) {
587 deleteThread(m->tso);
590 pthread_cond_broadcast(&m->wakeup);
594 deleteThread(m->tso);
597 main_threads = m->link;
604 /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
608 /* If there's a GC pending, don't do anything until it has
612 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
613 pthread_cond_wait(&gc_pending_cond, &sched_mutex);
616 /* block until we've got a thread on the run queue and a free
619 while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
620 IF_DEBUG(scheduler, sched_belch("waiting for work"));
621 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
622 IF_DEBUG(scheduler, sched_belch("work now available"));
628 if (RtsFlags.GranFlags.Light)
629 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
631 /* adjust time based on time-stamp */
632 if (event->time > CurrentTime[CurrentProc] &&
633 event->evttype != ContinueThread)
634 CurrentTime[CurrentProc] = event->time;
636 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
637 if (!RtsFlags.GranFlags.Light)
640 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
642 /* main event dispatcher in GranSim */
643 switch (event->evttype) {
644 /* Should just be continuing execution */
646 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
647 /* ToDo: check assertion
648 ASSERT(run_queue_hd != (StgTSO*)NULL &&
649 run_queue_hd != END_TSO_QUEUE);
651 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
652 if (!RtsFlags.GranFlags.DoAsyncFetch &&
653 procStatus[CurrentProc]==Fetching) {
654 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
655 CurrentTSO->id, CurrentTSO, CurrentProc);
658 /* Ignore ContinueThreads for completed threads */
659 if (CurrentTSO->what_next == ThreadComplete) {
660 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
661 CurrentTSO->id, CurrentTSO, CurrentProc);
664 /* Ignore ContinueThreads for threads that are being migrated */
665 if (PROCS(CurrentTSO)==Nowhere) {
666 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
667 CurrentTSO->id, CurrentTSO, CurrentProc);
670 /* The thread should be at the beginning of the run queue */
671 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
672 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
673 CurrentTSO->id, CurrentTSO, CurrentProc);
674 break; // run the thread anyway
677 new_event(proc, proc, CurrentTime[proc],
679 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
681 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
682 break; // now actually run the thread; DaH Qu'vam yImuHbej
685 do_the_fetchnode(event);
686 goto next_thread; /* handle next event in event queue */
689 do_the_globalblock(event);
690 goto next_thread; /* handle next event in event queue */
693 do_the_fetchreply(event);
694 goto next_thread; /* handle next event in event queue */
696 case UnblockThread: /* Move from the blocked queue to the tail of */
697 do_the_unblock(event);
698 goto next_thread; /* handle next event in event queue */
700 case ResumeThread: /* Move from the blocked queue to the tail of */
701 /* the runnable queue ( i.e. Qu' SImqa'lu') */
702 event->tso->gran.blocktime +=
703 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
704 do_the_startthread(event);
705 goto next_thread; /* handle next event in event queue */
708 do_the_startthread(event);
709 goto next_thread; /* handle next event in event queue */
712 do_the_movethread(event);
713 goto next_thread; /* handle next event in event queue */
716 do_the_movespark(event);
717 goto next_thread; /* handle next event in event queue */
720 do_the_findwork(event);
721 goto next_thread; /* handle next event in event queue */
724 barf("Illegal event type %u\n", event->evttype);
727 /* This point was scheduler_loop in the old RTS */
729 IF_DEBUG(gran, belch("GRAN: after main switch"));
731 TimeOfLastEvent = CurrentTime[CurrentProc];
732 TimeOfNextEvent = get_time_of_next_event();
733 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
734 // CurrentTSO = ThreadQueueHd;
736 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
739 if (RtsFlags.GranFlags.Light)
740 GranSimLight_leave_system(event, &ActiveTSO);
742 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
745 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
747 /* in a GranSim setup the TSO stays on the run queue */
749 /* Take a thread from the run queue. */
750 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
753 fprintf(stderr, "GRAN: About to run current thread, which is\n");
756 context_switch = 0; // turned on via GranYield, checking events and time slice
759 DumpGranEvent(GR_SCHEDULE, t));
761 procStatus[CurrentProc] = Busy;
764 if (PendingFetches != END_BF_QUEUE) {
768 /* ToDo: phps merge with spark activation above */
769 /* check whether we have local work and send requests if we have none */
770 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
771 /* :-[ no local threads => look out for local sparks */
772 /* the spark pool for the current PE */
773 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
774 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
775 pool->hd < pool->tl) {
777 * ToDo: add GC code check that we really have enough heap afterwards!!
779 * If we're here (no runnable threads) and we have pending
780 * sparks, we must have a space problem. Get enough space
781 * to turn one of those pending sparks into a
785 spark = findSpark(rtsFalse); /* get a spark */
786 if (spark != (rtsSpark) NULL) {
787 tso = activateSpark(spark); /* turn the spark into a thread */
788 IF_PAR_DEBUG(schedule,
789 belch("==== schedule: Created TSO %d (%p); %d threads active",
790 tso->id, tso, advisory_thread_count));
792 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
793 belch("==^^ failed to activate spark");
795 } /* otherwise fall through & pick-up new tso */
797 IF_PAR_DEBUG(verbose,
798 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
799 spark_queue_len(pool)));
804 /* If we still have no work we need to send a FISH to get a spark
807 if (EMPTY_RUN_QUEUE()) {
808 /* =8-[ no local sparks => look for work on other PEs */
810 * We really have absolutely no work. Send out a fish
811 * (there may be some out there already), and wait for
812 * something to arrive. We clearly can't run any threads
813 * until a SCHEDULE or RESUME arrives, and so that's what
814 * we're hoping to see. (Of course, we still have to
815 * respond to other types of messages.)
817 TIME now = msTime() /*CURRENT_TIME*/;
818 IF_PAR_DEBUG(verbose,
819 belch("-- now=%ld", now));
820 IF_PAR_DEBUG(verbose,
821 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
822 (last_fish_arrived_at!=0 &&
823 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
824 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
825 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
826 last_fish_arrived_at,
827 RtsFlags.ParFlags.fishDelay, now);
830 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
831 (last_fish_arrived_at==0 ||
832 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
833 /* outstandingFishes is set in sendFish, processFish;
834 avoid flooding system with fishes via delay */
836 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
839 // Global statistics: count no. of fishes
840 if (RtsFlags.ParFlags.ParStats.Global &&
841 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
842 globalParStats.tot_fish_mess++;
846 receivedFinish = processMessages();
849 } else if (PacketsWaiting()) { /* Look for incoming messages */
850 receivedFinish = processMessages();
853 /* Now we are sure that we have some work available */
854 ASSERT(run_queue_hd != END_TSO_QUEUE);
856 /* Take a thread from the run queue, if we have work */
857 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
858 IF_DEBUG(sanity,checkTSO(t));
860 /* ToDo: write something to the log-file
861 if (RTSflags.ParFlags.granSimStats && !sameThread)
862 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
866 /* the spark pool for the current PE */
867 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
870 belch("--=^ %d threads, %d sparks on [%#x]",
871 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
874 if (0 && RtsFlags.ParFlags.ParStats.Full &&
875 t && LastTSO && t->id != LastTSO->id &&
876 LastTSO->why_blocked == NotBlocked &&
877 LastTSO->what_next != ThreadComplete) {
878 // if previously scheduled TSO not blocked we have to record the context switch
879 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
880 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
883 if (RtsFlags.ParFlags.ParStats.Full &&
884 (emitSchedule /* forced emit */ ||
885 (t && LastTSO && t->id != LastTSO->id))) {
887 we are running a different TSO, so write a schedule event to log file
888 NB: If we use fair scheduling we also have to write a deschedule
889 event for LastTSO; with unfair scheduling we know that the
890 previous tso has blocked whenever we switch to another tso, so
891 we don't need it in GUM for now
893 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
894 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
895 emitSchedule = rtsFalse;
899 #else /* !GRAN && !PAR */
901 /* grab a thread from the run queue
903 ASSERT(run_queue_hd != END_TSO_QUEUE);
905 IF_DEBUG(sanity,checkTSO(t));
912 cap = free_capabilities;
913 free_capabilities = cap->link;
914 n_free_capabilities--;
919 cap->rCurrentTSO = t;
921 /* context switches are now initiated by the timer signal, unless
922 * the user specified "context switch as often as possible", with
925 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
926 && (run_queue_hd != END_TSO_QUEUE
927 || blocked_queue_hd != END_TSO_QUEUE
928 || sleeping_queue != END_TSO_QUEUE))
933 RELEASE_LOCK(&sched_mutex);
935 IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
936 t->id, t, whatNext_strs[t->what_next]));
938 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
939 /* Run the current thread
941 switch (cap->rCurrentTSO->what_next) {
944 /* Thread already finished, return to scheduler. */
945 ret = ThreadFinished;
948 ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
951 ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
953 case ThreadEnterInterp:
954 ret = interpretBCO(cap);
957 barf("schedule: invalid what_next field");
959 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
961 /* Costs for the scheduler are assigned to CCS_SYSTEM */
966 ACQUIRE_LOCK(&sched_mutex);
969 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
970 #elif !defined(GRAN) && !defined(PAR)
971 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
973 t = cap->rCurrentTSO;
976 /* HACK 675: if the last thread didn't yield, make sure to print a
977 SCHEDULE event to the log file when StgRunning the next thread, even
978 if it is the same one as before */
980 TimeOfLastYield = CURRENT_TIME;
987 DumpGranEvent(GR_DESCHEDULE, t));
988 globalGranStats.tot_heapover++;
991 //DumpGranEvent(GR_DESCHEDULE, t);
992 globalParStats.tot_heapover++;
994 /* make all the running tasks block on a condition variable,
995 * maybe set context_switch and wait till they all pile in,
996 * then have them wait on a GC condition variable.
998 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
999 t->id, t, whatNext_strs[t->what_next]));
1002 ASSERT(!is_on_queue(t,CurrentProc));
1004 /* Currently we emit a DESCHEDULE event before GC in GUM.
1005 ToDo: either add separate event to distinguish SYSTEM time from rest
1006 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1007 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1008 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1009 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1010 emitSchedule = rtsTrue;
1014 ready_to_gc = rtsTrue;
1015 context_switch = 1; /* stop other threads ASAP */
1016 PUSH_ON_RUN_QUEUE(t);
1017 /* actual GC is done at the end of the while loop */
1023 DumpGranEvent(GR_DESCHEDULE, t));
1024 globalGranStats.tot_stackover++;
1027 // DumpGranEvent(GR_DESCHEDULE, t);
1028 globalParStats.tot_stackover++;
1030 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
1031 t->id, t, whatNext_strs[t->what_next]));
1032 /* just adjust the stack for this thread, then pop it back
1038 /* enlarge the stack */
1039 StgTSO *new_t = threadStackOverflow(t);
1041 /* This TSO has moved, so update any pointers to it from the
1042 * main thread stack. It better not be on any other queues...
1043 * (it shouldn't be).
1045 for (m = main_threads; m != NULL; m = m->link) {
1050 threadPaused(new_t);
1051 PUSH_ON_RUN_QUEUE(new_t);
1055 case ThreadYielding:
1058 DumpGranEvent(GR_DESCHEDULE, t));
1059 globalGranStats.tot_yields++;
1062 // DumpGranEvent(GR_DESCHEDULE, t);
1063 globalParStats.tot_yields++;
1065 /* put the thread back on the run queue. Then, if we're ready to
1066 * GC, check whether this is the last task to stop. If so, wake
1067 * up the GC thread. getThread will block during a GC until the
1071 if (t->what_next == ThreadEnterInterp) {
1072 /* ToDo: or maybe a timer expired when we were in Hugs?
1073 * or maybe someone hit ctrl-C
1075 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
1076 t->id, t, whatNext_strs[t->what_next]);
1078 belch("--<< thread %ld (%p; %s) stopped, yielding",
1079 t->id, t, whatNext_strs[t->what_next]);
1086 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1088 ASSERT(t->link == END_TSO_QUEUE);
1090 ASSERT(!is_on_queue(t,CurrentProc));
1093 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1094 checkThreadQsSanity(rtsTrue));
1097 if (RtsFlags.ParFlags.doFairScheduling) {
1098 /* this does round-robin scheduling; good for concurrency */
1099 APPEND_TO_RUN_QUEUE(t);
1101 /* this does unfair scheduling; good for parallelism */
1102 PUSH_ON_RUN_QUEUE(t);
1105 /* this does round-robin scheduling; good for concurrency */
1106 APPEND_TO_RUN_QUEUE(t);
1109 /* add a ContinueThread event to actually process the thread */
1110 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1112 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1114 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1123 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1124 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1125 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1127 // ??? needed; should emit block before
1129 DumpGranEvent(GR_DESCHEDULE, t));
1130 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1133 ASSERT(procStatus[CurrentProc]==Busy ||
1134 ((procStatus[CurrentProc]==Fetching) &&
1135 (t->block_info.closure!=(StgClosure*)NULL)));
1136 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1137 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1138 procStatus[CurrentProc]==Fetching))
1139 procStatus[CurrentProc] = Idle;
1143 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1144 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1147 if (t->block_info.closure!=(StgClosure*)NULL)
1148 print_bq(t->block_info.closure));
1150 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1153 /* whatever we schedule next, we must log that schedule */
1154 emitSchedule = rtsTrue;
1157 /* don't need to do anything. Either the thread is blocked on
1158 * I/O, in which case we'll have called addToBlockedQueue
1159 * previously, or it's blocked on an MVar or Blackhole, in which
1160 * case it'll be on the relevant queue already.
1163 fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1164 printThreadBlockage(t);
1165 fprintf(stderr, "\n"));
1167 /* Only for dumping event to log file
1168 ToDo: do I need this in GranSim, too?
1175 case ThreadFinished:
1176 /* Need to check whether this was a main thread, and if so, signal
1177 * the task that started it with the return value. If we have no
1178 * more main threads, we probably need to stop all the tasks until
1181 /* We also end up here if the thread kills itself with an
1182 * uncaught exception, see Exception.hc.
1184 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1186 endThread(t, CurrentProc); // clean-up the thread
1188 /* For now all are advisory -- HWL */
1189 //if(t->priority==AdvisoryPriority) ??
1190 advisory_thread_count--;
1193 if(t->dist.priority==RevalPriority)
1197 if (RtsFlags.ParFlags.ParStats.Full &&
1198 !RtsFlags.ParFlags.ParStats.Suppressed)
1199 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1204 barf("schedule: invalid thread return code %d", (int)ret);
1208 cap->link = free_capabilities;
1209 free_capabilities = cap;
1210 n_free_capabilities++;
1214 if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
1219 /* everybody back, start the GC.
1220 * Could do it in this thread, or signal a condition var
1221 * to do it in another thread. Either way, we need to
1222 * broadcast on gc_pending_cond afterward.
1225 IF_DEBUG(scheduler,sched_belch("doing GC"));
1227 GarbageCollect(GetRoots,rtsFalse);
1228 ready_to_gc = rtsFalse;
1230 pthread_cond_broadcast(&gc_pending_cond);
1233 /* add a ContinueThread event to continue execution of current thread */
1234 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1236 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1238 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1245 IF_GRAN_DEBUG(unused,
1246 print_eventq(EventHd));
1248 event = get_next_event();
1252 /* ToDo: wait for next message to arrive rather than busy wait */
1257 t = take_off_run_queue(END_TSO_QUEUE);
1260 } /* end of while(1) */
1261 IF_PAR_DEBUG(verbose,
1262 belch("== Leaving schedule() after having received Finish"));
1265 /* ---------------------------------------------------------------------------
1266 * deleteAllThreads(): kill all the live threads.
1268 * This is used when we catch a user interrupt (^C), before performing
1269 * any necessary cleanups and running finalizers.
1270 * ------------------------------------------------------------------------- */
1272 void deleteAllThreads ( void )
1275 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1276 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1279 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1282 for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1285 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1286 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1287 sleeping_queue = END_TSO_QUEUE;
1290 /* startThread and insertThread are now in GranSim.c -- HWL */
1292 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1293 //@subsection Suspend and Resume
1295 /* ---------------------------------------------------------------------------
1296 * Suspending & resuming Haskell threads.
1298 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1299 * its capability before calling the C function. This allows another
1300 * task to pick up the capability and carry on running Haskell
1301 * threads. It also means that if the C call blocks, it won't lock
1304 * The Haskell thread making the C call is put to sleep for the
1305 * duration of the call, on the susepended_ccalling_threads queue. We
1306 * give out a token to the task, which it can use to resume the thread
1307 * on return from the C function.
1308 * ------------------------------------------------------------------------- */
1311 suspendThread( Capability *cap )
1315 ACQUIRE_LOCK(&sched_mutex);
1318 sched_belch("thread %d did a _ccall_gc", cap->rCurrentTSO->id));
1320 threadPaused(cap->rCurrentTSO);
1321 cap->rCurrentTSO->link = suspended_ccalling_threads;
1322 suspended_ccalling_threads = cap->rCurrentTSO;
1324 /* Use the thread ID as the token; it should be unique */
1325 tok = cap->rCurrentTSO->id;
1328 cap->link = free_capabilities;
1329 free_capabilities = cap;
1330 n_free_capabilities++;
1333 RELEASE_LOCK(&sched_mutex);
1338 resumeThread( StgInt tok )
1340 StgTSO *tso, **prev;
1343 ACQUIRE_LOCK(&sched_mutex);
1345 prev = &suspended_ccalling_threads;
1346 for (tso = suspended_ccalling_threads;
1347 tso != END_TSO_QUEUE;
1348 prev = &tso->link, tso = tso->link) {
1349 if (tso->id == (StgThreadID)tok) {
1354 if (tso == END_TSO_QUEUE) {
1355 barf("resumeThread: thread not found");
1357 tso->link = END_TSO_QUEUE;
1360 while (free_capabilities == NULL) {
1361 IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1362 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1363 IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1365 cap = free_capabilities;
1366 free_capabilities = cap->link;
1367 n_free_capabilities--;
1369 cap = &MainRegTable;
1372 cap->rCurrentTSO = tso;
1374 RELEASE_LOCK(&sched_mutex);
1379 /* ---------------------------------------------------------------------------
1381 * ------------------------------------------------------------------------ */
1382 static void unblockThread(StgTSO *tso);
1384 /* ---------------------------------------------------------------------------
1385 * Comparing Thread ids.
1387 * This is used from STG land in the implementation of the
1388 * instances of Eq/Ord for ThreadIds.
1389 * ------------------------------------------------------------------------ */
1391 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1393 StgThreadID id1 = tso1->id;
1394 StgThreadID id2 = tso2->id;
1396 if (id1 < id2) return (-1);
1397 if (id1 > id2) return 1;
1401 /* ---------------------------------------------------------------------------
1402 * Fetching the ThreadID from an StgTSO.
1404 * This is used in the implementation of Show for ThreadIds.
1405 * ------------------------------------------------------------------------ */
1406 int rts_getThreadId(const StgTSO *tso)
1411 /* ---------------------------------------------------------------------------
1412 Create a new thread.
1414 The new thread starts with the given stack size. Before the
1415 scheduler can run, however, this thread needs to have a closure
1416 (and possibly some arguments) pushed on its stack. See
1417 pushClosure() in Schedule.h.
1419 createGenThread() and createIOThread() (in SchedAPI.h) are
1420 convenient packaged versions of this function.
1422 currently pri (priority) is only used in a GRAN setup -- HWL
1423 ------------------------------------------------------------------------ */
1424 //@cindex createThread
1426 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1428 createThread(nat stack_size, StgInt pri)
1430 return createThread_(stack_size, rtsFalse, pri);
1434 createThread_(nat size, rtsBool have_lock, StgInt pri)
1438 createThread(nat stack_size)
1440 return createThread_(stack_size, rtsFalse);
1444 createThread_(nat size, rtsBool have_lock)
1451 /* First check whether we should create a thread at all */
1453 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1454 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1456 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1457 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1458 return END_TSO_QUEUE;
1464 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1467 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1469 /* catch ridiculously small stack sizes */
1470 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1471 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1474 stack_size = size - TSO_STRUCT_SIZEW;
1476 tso = (StgTSO *)allocate(size);
1477 TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1479 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1481 SET_GRAN_HDR(tso, ThisPE);
1483 tso->what_next = ThreadEnterGHC;
1485 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1486 * protect the increment operation on next_thread_id.
1487 * In future, we could use an atomic increment instead.
1489 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1490 tso->id = next_thread_id++;
1491 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1493 tso->why_blocked = NotBlocked;
1494 tso->blocked_exceptions = NULL;
1496 tso->stack_size = stack_size;
1497 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1499 tso->sp = (P_)&(tso->stack) + stack_size;
1502 tso->prof.CCCS = CCS_MAIN;
1505 /* put a stop frame on the stack */
1506 tso->sp -= sizeofW(StgStopFrame);
1507 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1508 tso->su = (StgUpdateFrame*)tso->sp;
1512 tso->link = END_TSO_QUEUE;
1513 /* uses more flexible routine in GranSim */
1514 insertThread(tso, CurrentProc);
1516 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1522 if (RtsFlags.GranFlags.GranSimStats.Full)
1523 DumpGranEvent(GR_START,tso);
1525 if (RtsFlags.ParFlags.ParStats.Full)
1526 DumpGranEvent(GR_STARTQ,tso);
1527 /* HACk to avoid SCHEDULE
1531 /* Link the new thread on the global thread list.
1533 tso->global_link = all_threads;
1537 tso->dist.priority = MandatoryPriority; //by default that is...
1541 tso->gran.pri = pri;
1543 tso->gran.magic = TSO_MAGIC; // debugging only
1545 tso->gran.sparkname = 0;
1546 tso->gran.startedat = CURRENT_TIME;
1547 tso->gran.exported = 0;
1548 tso->gran.basicblocks = 0;
1549 tso->gran.allocs = 0;
1550 tso->gran.exectime = 0;
1551 tso->gran.fetchtime = 0;
1552 tso->gran.fetchcount = 0;
1553 tso->gran.blocktime = 0;
1554 tso->gran.blockcount = 0;
1555 tso->gran.blockedat = 0;
1556 tso->gran.globalsparks = 0;
1557 tso->gran.localsparks = 0;
1558 if (RtsFlags.GranFlags.Light)
1559 tso->gran.clock = Now; /* local clock */
1561 tso->gran.clock = 0;
1563 IF_DEBUG(gran,printTSO(tso));
1566 tso->par.magic = TSO_MAGIC; // debugging only
1568 tso->par.sparkname = 0;
1569 tso->par.startedat = CURRENT_TIME;
1570 tso->par.exported = 0;
1571 tso->par.basicblocks = 0;
1572 tso->par.allocs = 0;
1573 tso->par.exectime = 0;
1574 tso->par.fetchtime = 0;
1575 tso->par.fetchcount = 0;
1576 tso->par.blocktime = 0;
1577 tso->par.blockcount = 0;
1578 tso->par.blockedat = 0;
1579 tso->par.globalsparks = 0;
1580 tso->par.localsparks = 0;
1584 globalGranStats.tot_threads_created++;
1585 globalGranStats.threads_created_on_PE[CurrentProc]++;
1586 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1587 globalGranStats.tot_sq_probes++;
1589 // collect parallel global statistics (currently done together with GC stats)
1590 if (RtsFlags.ParFlags.ParStats.Global &&
1591 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1592 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
1593 globalParStats.tot_threads_created++;
1599 belch("==__ schedule: Created TSO %d (%p);",
1600 CurrentProc, tso, tso->id));
1602 IF_PAR_DEBUG(verbose,
1603 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1604 tso->id, tso, advisory_thread_count));
1606 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1607 tso->id, tso->stack_size));
1614 all parallel thread creation calls should fall through the following routine.
1617 createSparkThread(rtsSpark spark)
1619 ASSERT(spark != (rtsSpark)NULL);
1620 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1622 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1623 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1624 return END_TSO_QUEUE;
1628 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1629 if (tso==END_TSO_QUEUE)
1630 barf("createSparkThread: Cannot create TSO");
1632 tso->priority = AdvisoryPriority;
1634 pushClosure(tso,spark);
1635 PUSH_ON_RUN_QUEUE(tso);
1636 advisory_thread_count++;
1643 Turn a spark into a thread.
1644 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1647 //@cindex activateSpark
1649 activateSpark (rtsSpark spark)
1653 tso = createSparkThread(spark);
1654 if (RtsFlags.ParFlags.ParStats.Full) {
1655 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1656 IF_PAR_DEBUG(verbose,
1657 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1658 (StgClosure *)spark, info_type((StgClosure *)spark)));
1660 // ToDo: fwd info on local/global spark to thread -- HWL
1661 // tso->gran.exported = spark->exported;
1662 // tso->gran.locked = !spark->global;
1663 // tso->gran.sparkname = spark->name;
1669 /* ---------------------------------------------------------------------------
1672 * scheduleThread puts a thread on the head of the runnable queue.
1673 * This will usually be done immediately after a thread is created.
1674 * The caller of scheduleThread must create the thread using e.g.
1675 * createThread and push an appropriate closure
1676 * on this thread's stack before the scheduler is invoked.
1677 * ------------------------------------------------------------------------ */
1680 scheduleThread(StgTSO *tso)
1682 if (tso==END_TSO_QUEUE){
1687 ACQUIRE_LOCK(&sched_mutex);
1689 /* Put the new thread on the head of the runnable queue. The caller
1690 * better push an appropriate closure on this thread's stack
1691 * beforehand. In the SMP case, the thread may start running as
1692 * soon as we release the scheduler lock below.
1694 PUSH_ON_RUN_QUEUE(tso);
1698 IF_DEBUG(scheduler,printTSO(tso));
1700 RELEASE_LOCK(&sched_mutex);
1703 /* ---------------------------------------------------------------------------
1706 * Start up Posix threads to run each of the scheduler tasks.
1707 * I believe the task ids are not needed in the system as defined.
1709 * ------------------------------------------------------------------------ */
1711 #if defined(PAR) || defined(SMP)
1713 taskStart(void) /* ( void *arg STG_UNUSED) */
1715 scheduleThread(END_TSO_QUEUE);
1719 /* ---------------------------------------------------------------------------
1722 * Initialise the scheduler. This resets all the queues - if the
1723 * queues contained any threads, they'll be garbage collected at the
1726 * This now calls startTasks(), so should only be called once! KH @ 25/10/99
1727 * ------------------------------------------------------------------------ */
1731 term_handler(int sig STG_UNUSED)
1734 ACQUIRE_LOCK(&term_mutex);
1736 RELEASE_LOCK(&term_mutex);
1741 //@cindex initScheduler
1748 for (i=0; i<=MAX_PROC; i++) {
1749 run_queue_hds[i] = END_TSO_QUEUE;
1750 run_queue_tls[i] = END_TSO_QUEUE;
1751 blocked_queue_hds[i] = END_TSO_QUEUE;
1752 blocked_queue_tls[i] = END_TSO_QUEUE;
1753 ccalling_threadss[i] = END_TSO_QUEUE;
1754 sleeping_queue = END_TSO_QUEUE;
1757 run_queue_hd = END_TSO_QUEUE;
1758 run_queue_tl = END_TSO_QUEUE;
1759 blocked_queue_hd = END_TSO_QUEUE;
1760 blocked_queue_tl = END_TSO_QUEUE;
1761 sleeping_queue = END_TSO_QUEUE;
1764 suspended_ccalling_threads = END_TSO_QUEUE;
1766 main_threads = NULL;
1767 all_threads = END_TSO_QUEUE;
1772 RtsFlags.ConcFlags.ctxtSwitchTicks =
1773 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1775 /* Install the SIGHUP handler */
1778 struct sigaction action,oact;
1780 action.sa_handler = term_handler;
1781 sigemptyset(&action.sa_mask);
1782 action.sa_flags = 0;
1783 if (sigaction(SIGTERM, &action, &oact) != 0) {
1784 barf("can't install TERM handler");
1790 /* Allocate N Capabilities */
1793 Capability *cap, *prev;
1796 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1797 cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1801 free_capabilities = cap;
1802 n_free_capabilities = RtsFlags.ParFlags.nNodes;
1804 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1805 n_free_capabilities););
1808 #if defined(SMP) || defined(PAR)
1821 /* make some space for saving all the thread ids */
1822 task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1823 "initScheduler:task_ids");
1825 /* and create all the threads */
1826 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1827 r = pthread_create(&tid,NULL,taskStart,NULL);
1829 barf("startTasks: Can't create new Posix thread");
1831 task_ids[i].id = tid;
1832 task_ids[i].mut_time = 0.0;
1833 task_ids[i].mut_etime = 0.0;
1834 task_ids[i].gc_time = 0.0;
1835 task_ids[i].gc_etime = 0.0;
1836 task_ids[i].elapsedtimestart = elapsedtime();
1837 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1843 exitScheduler( void )
1848 /* Don't want to use pthread_cancel, since we'd have to install
1849 * these silly exception handlers (pthread_cleanup_{push,pop}) around
1853 /* Cancel all our tasks */
1854 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1855 pthread_cancel(task_ids[i].id);
1858 /* Wait for all the tasks to terminate */
1859 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1860 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n",
1862 pthread_join(task_ids[i].id, NULL);
1866 /* Send 'em all a SIGHUP. That should shut 'em up.
1868 await_death = RtsFlags.ParFlags.nNodes;
1869 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1870 pthread_kill(task_ids[i].id,SIGTERM);
1872 while (await_death > 0) {
1878 /* -----------------------------------------------------------------------------
1879 Managing the per-task allocation areas.
1881 Each capability comes with an allocation area. These are
1882 fixed-length block lists into which allocation can be done.
1884 ToDo: no support for two-space collection at the moment???
1885 -------------------------------------------------------------------------- */
1887 /* -----------------------------------------------------------------------------
1888 * waitThread is the external interface for running a new computation
1889 * and waiting for the result.
1891 * In the non-SMP case, we create a new main thread, push it on the
1892 * main-thread stack, and invoke the scheduler to run it. The
1893 * scheduler will return when the top main thread on the stack has
1894 * completed or died, and fill in the necessary fields of the
1895 * main_thread structure.
1897 * In the SMP case, we create a main thread as before, but we then
1898 * create a new condition variable and sleep on it. When our new
1899 * main thread has completed, we'll be woken up and the status/result
1900 * will be in the main_thread struct.
1901 * -------------------------------------------------------------------------- */
1904 howManyThreadsAvail ( void )
1908 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1910 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1912 for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1918 finishAllThreads ( void )
1921 while (run_queue_hd != END_TSO_QUEUE) {
1922 waitThread ( run_queue_hd, NULL );
1924 while (blocked_queue_hd != END_TSO_QUEUE) {
1925 waitThread ( blocked_queue_hd, NULL );
1927 while (sleeping_queue != END_TSO_QUEUE) {
1928 waitThread ( blocked_queue_hd, NULL );
1931 (blocked_queue_hd != END_TSO_QUEUE ||
1932 run_queue_hd != END_TSO_QUEUE ||
1933 sleeping_queue != END_TSO_QUEUE);
1937 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1940 SchedulerStatus stat;
1942 ACQUIRE_LOCK(&sched_mutex);
1944 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1950 pthread_cond_init(&m->wakeup, NULL);
1953 m->link = main_threads;
1956 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n",
1961 pthread_cond_wait(&m->wakeup, &sched_mutex);
1962 } while (m->stat == NoStatus);
1964 /* GranSim specific init */
1965 CurrentTSO = m->tso; // the TSO to run
1966 procStatus[MainProc] = Busy; // status of main PE
1967 CurrentProc = MainProc; // PE to run it on
1972 ASSERT(m->stat != NoStatus);
1978 pthread_cond_destroy(&m->wakeup);
1981 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
1985 RELEASE_LOCK(&sched_mutex);
1990 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1991 //@subsection Run queue code
1995 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1996 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1997 implicit global variable that has to be correct when calling these
2001 /* Put the new thread on the head of the runnable queue.
2002 * The caller of createThread better push an appropriate closure
2003 * on this thread's stack before the scheduler is invoked.
2005 static /* inline */ void
2006 add_to_run_queue(tso)
2009 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2010 tso->link = run_queue_hd;
2012 if (run_queue_tl == END_TSO_QUEUE) {
2017 /* Put the new thread at the end of the runnable queue. */
2018 static /* inline */ void
2019 push_on_run_queue(tso)
2022 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2023 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2024 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2025 if (run_queue_hd == END_TSO_QUEUE) {
2028 run_queue_tl->link = tso;
2034 Should be inlined because it's used very often in schedule. The tso
2035 argument is actually only needed in GranSim, where we want to have the
2036 possibility to schedule *any* TSO on the run queue, irrespective of the
2037 actual ordering. Therefore, if tso is not the nil TSO then we traverse
2038 the run queue and dequeue the tso, adjusting the links in the queue.
2040 //@cindex take_off_run_queue
2041 static /* inline */ StgTSO*
2042 take_off_run_queue(StgTSO *tso) {
2046 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2048 if tso is specified, unlink that tso from the run_queue (doesn't have
2049 to be at the beginning of the queue); GranSim only
2051 if (tso!=END_TSO_QUEUE) {
2052 /* find tso in queue */
2053 for (t=run_queue_hd, prev=END_TSO_QUEUE;
2054 t!=END_TSO_QUEUE && t!=tso;
2058 /* now actually dequeue the tso */
2059 if (prev!=END_TSO_QUEUE) {
2060 ASSERT(run_queue_hd!=t);
2061 prev->link = t->link;
2063 /* t is at beginning of thread queue */
2064 ASSERT(run_queue_hd==t);
2065 run_queue_hd = t->link;
2067 /* t is at end of thread queue */
2068 if (t->link==END_TSO_QUEUE) {
2069 ASSERT(t==run_queue_tl);
2070 run_queue_tl = prev;
2072 ASSERT(run_queue_tl!=t);
2074 t->link = END_TSO_QUEUE;
2076 /* take tso from the beginning of the queue; std concurrent code */
2078 if (t != END_TSO_QUEUE) {
2079 run_queue_hd = t->link;
2080 t->link = END_TSO_QUEUE;
2081 if (run_queue_hd == END_TSO_QUEUE) {
2082 run_queue_tl = END_TSO_QUEUE;
2091 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2092 //@subsection Garbage Collextion Routines
2094 /* ---------------------------------------------------------------------------
2095 Where are the roots that we know about?
2097 - all the threads on the runnable queue
2098 - all the threads on the blocked queue
2099 - all the threads on the sleeping queue
2100 - all the thread currently executing a _ccall_GC
2101 - all the "main threads"
2103 ------------------------------------------------------------------------ */
2105 /* This has to be protected either by the scheduler monitor, or by the
2106 garbage collection monitor (probably the latter).
2111 GetRoots(evac_fn evac)
2118 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2119 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2120 evac((StgClosure **)&run_queue_hds[i]);
2121 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2122 evac((StgClosure **)&run_queue_tls[i]);
2124 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2125 evac((StgClosure **)&blocked_queue_hds[i]);
2126 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2127 evac((StgClosure **)&blocked_queue_tls[i]);
2128 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2129 evac((StgClosure **)&ccalling_threads[i]);
2136 if (run_queue_hd != END_TSO_QUEUE) {
2137 ASSERT(run_queue_tl != END_TSO_QUEUE);
2138 evac((StgClosure **)&run_queue_hd);
2139 evac((StgClosure **)&run_queue_tl);
2142 if (blocked_queue_hd != END_TSO_QUEUE) {
2143 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2144 evac((StgClosure **)&blocked_queue_hd);
2145 evac((StgClosure **)&blocked_queue_tl);
2148 if (sleeping_queue != END_TSO_QUEUE) {
2149 evac((StgClosure **)&sleeping_queue);
2153 for (m = main_threads; m != NULL; m = m->link) {
2154 evac((StgClosure **)&m->tso);
2156 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2157 evac((StgClosure **)&suspended_ccalling_threads);
2160 #if defined(SMP) || defined(PAR) || defined(GRAN)
2161 markSparkQueue(evac);
2165 /* -----------------------------------------------------------------------------
2168 This is the interface to the garbage collector from Haskell land.
2169 We provide this so that external C code can allocate and garbage
2170 collect when called from Haskell via _ccall_GC.
2172 It might be useful to provide an interface whereby the programmer
2173 can specify more roots (ToDo).
2175 This needs to be protected by the GC condition variable above. KH.
2176 -------------------------------------------------------------------------- */
2178 void (*extra_roots)(evac_fn);
2183 GarbageCollect(GetRoots,rtsFalse);
2187 performMajorGC(void)
2189 GarbageCollect(GetRoots,rtsTrue);
2193 AllRoots(evac_fn evac)
2195 GetRoots(evac); // the scheduler's roots
2196 extra_roots(evac); // the user's roots
2200 performGCWithRoots(void (*get_roots)(evac_fn))
2202 extra_roots = get_roots;
2203 GarbageCollect(AllRoots,rtsFalse);
2206 /* -----------------------------------------------------------------------------
2209 If the thread has reached its maximum stack size, then raise the
2210 StackOverflow exception in the offending thread. Otherwise
2211 relocate the TSO into a larger chunk of memory and adjust its stack
2213 -------------------------------------------------------------------------- */
2216 threadStackOverflow(StgTSO *tso)
2218 nat new_stack_size, new_tso_size, diff, stack_words;
2222 IF_DEBUG(sanity,checkTSO(tso));
2223 if (tso->stack_size >= tso->max_stack_size) {
2226 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2227 tso->id, tso, tso->stack_size, tso->max_stack_size);
2228 /* If we're debugging, just print out the top of the stack */
2229 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2232 /* Send this thread the StackOverflow exception */
2233 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2237 /* Try to double the current stack size. If that takes us over the
2238 * maximum stack size for this thread, then use the maximum instead.
2239 * Finally round up so the TSO ends up as a whole number of blocks.
2241 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2242 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2243 TSO_STRUCT_SIZE)/sizeof(W_);
2244 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2245 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2247 IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2249 dest = (StgTSO *)allocate(new_tso_size);
2250 TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2252 /* copy the TSO block and the old stack into the new area */
2253 memcpy(dest,tso,TSO_STRUCT_SIZE);
2254 stack_words = tso->stack + tso->stack_size - tso->sp;
2255 new_sp = (P_)dest + new_tso_size - stack_words;
2256 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2258 /* relocate the stack pointers... */
2259 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2260 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2262 dest->stack_size = new_stack_size;
2264 /* and relocate the update frame list */
2265 relocate_stack(dest, diff);
2267 /* Mark the old TSO as relocated. We have to check for relocated
2268 * TSOs in the garbage collector and any primops that deal with TSOs.
2270 * It's important to set the sp and su values to just beyond the end
2271 * of the stack, so we don't attempt to scavenge any part of the
2274 tso->what_next = ThreadRelocated;
2276 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2277 tso->su = (StgUpdateFrame *)tso->sp;
2278 tso->why_blocked = NotBlocked;
2279 dest->mut_link = NULL;
2281 IF_PAR_DEBUG(verbose,
2282 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2283 tso->id, tso, tso->stack_size);
2284 /* If we're debugging, just print out the top of the stack */
2285 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2288 IF_DEBUG(sanity,checkTSO(tso));
2290 IF_DEBUG(scheduler,printTSO(dest));
2296 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2297 //@subsection Blocking Queue Routines
2299 /* ---------------------------------------------------------------------------
2300 Wake up a queue that was blocked on some resource.
2301 ------------------------------------------------------------------------ */
2305 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2310 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2312 /* write RESUME events to log file and
2313 update blocked and fetch time (depending on type of the orig closure) */
2314 if (RtsFlags.ParFlags.ParStats.Full) {
2315 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2316 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2317 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2318 if (EMPTY_RUN_QUEUE())
2319 emitSchedule = rtsTrue;
2321 switch (get_itbl(node)->type) {
2323 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2328 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2335 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2342 static StgBlockingQueueElement *
2343 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2346 PEs node_loc, tso_loc;
2348 node_loc = where_is(node); // should be lifted out of loop
2349 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2350 tso_loc = where_is((StgClosure *)tso);
2351 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2352 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2353 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2354 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2355 // insertThread(tso, node_loc);
2356 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2358 tso, node, (rtsSpark*)NULL);
2359 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2362 } else { // TSO is remote (actually should be FMBQ)
2363 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2364 RtsFlags.GranFlags.Costs.gunblocktime +
2365 RtsFlags.GranFlags.Costs.latency;
2366 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2368 tso, node, (rtsSpark*)NULL);
2369 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2372 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2374 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2375 (node_loc==tso_loc ? "Local" : "Global"),
2376 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2377 tso->block_info.closure = NULL;
2378 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2382 static StgBlockingQueueElement *
2383 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2385 StgBlockingQueueElement *next;
2387 switch (get_itbl(bqe)->type) {
2389 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2390 /* if it's a TSO just push it onto the run_queue */
2392 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2393 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2395 unblockCount(bqe, node);
2396 /* reset blocking status after dumping event */
2397 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2401 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2403 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2404 PendingFetches = (StgBlockedFetch *)bqe;
2408 /* can ignore this case in a non-debugging setup;
2409 see comments on RBHSave closures above */
2411 /* check that the closure is an RBHSave closure */
2412 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2413 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2414 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2418 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2419 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2423 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2427 #else /* !GRAN && !PAR */
2429 unblockOneLocked(StgTSO *tso)
2433 ASSERT(get_itbl(tso)->type == TSO);
2434 ASSERT(tso->why_blocked != NotBlocked);
2435 tso->why_blocked = NotBlocked;
2437 PUSH_ON_RUN_QUEUE(tso);
2439 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2444 #if defined(GRAN) || defined(PAR)
2445 inline StgBlockingQueueElement *
2446 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2448 ACQUIRE_LOCK(&sched_mutex);
2449 bqe = unblockOneLocked(bqe, node);
2450 RELEASE_LOCK(&sched_mutex);
2455 unblockOne(StgTSO *tso)
2457 ACQUIRE_LOCK(&sched_mutex);
2458 tso = unblockOneLocked(tso);
2459 RELEASE_LOCK(&sched_mutex);
2466 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2468 StgBlockingQueueElement *bqe;
2473 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2474 node, CurrentProc, CurrentTime[CurrentProc],
2475 CurrentTSO->id, CurrentTSO));
2477 node_loc = where_is(node);
2479 ASSERT(q == END_BQ_QUEUE ||
2480 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2481 get_itbl(q)->type == CONSTR); // closure (type constructor)
2482 ASSERT(is_unique(node));
2484 /* FAKE FETCH: magically copy the node to the tso's proc;
2485 no Fetch necessary because in reality the node should not have been
2486 moved to the other PE in the first place
2488 if (CurrentProc!=node_loc) {
2490 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2491 node, node_loc, CurrentProc, CurrentTSO->id,
2492 // CurrentTSO, where_is(CurrentTSO),
2493 node->header.gran.procs));
2494 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2496 belch("## new bitmask of node %p is %#x",
2497 node, node->header.gran.procs));
2498 if (RtsFlags.GranFlags.GranSimStats.Global) {
2499 globalGranStats.tot_fake_fetches++;
2504 // ToDo: check: ASSERT(CurrentProc==node_loc);
2505 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2508 bqe points to the current element in the queue
2509 next points to the next element in the queue
2511 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2512 //tso_loc = where_is(tso);
2514 bqe = unblockOneLocked(bqe, node);
2517 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2518 the closure to make room for the anchor of the BQ */
2519 if (bqe!=END_BQ_QUEUE) {
2520 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2522 ASSERT((info_ptr==&RBH_Save_0_info) ||
2523 (info_ptr==&RBH_Save_1_info) ||
2524 (info_ptr==&RBH_Save_2_info));
2526 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2527 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2528 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2531 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2532 node, info_type(node)));
2535 /* statistics gathering */
2536 if (RtsFlags.GranFlags.GranSimStats.Global) {
2537 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2538 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2539 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2540 globalGranStats.tot_awbq++; // total no. of bqs awakened
2543 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2544 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2548 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2550 StgBlockingQueueElement *bqe;
2552 ACQUIRE_LOCK(&sched_mutex);
2554 IF_PAR_DEBUG(verbose,
2555 belch("##-_ AwBQ for node %p on [%x]: ",
2559 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2560 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2565 ASSERT(q == END_BQ_QUEUE ||
2566 get_itbl(q)->type == TSO ||
2567 get_itbl(q)->type == BLOCKED_FETCH ||
2568 get_itbl(q)->type == CONSTR);
2571 while (get_itbl(bqe)->type==TSO ||
2572 get_itbl(bqe)->type==BLOCKED_FETCH) {
2573 bqe = unblockOneLocked(bqe, node);
2575 RELEASE_LOCK(&sched_mutex);
2578 #else /* !GRAN && !PAR */
2580 awakenBlockedQueue(StgTSO *tso)
2582 ACQUIRE_LOCK(&sched_mutex);
2583 while (tso != END_TSO_QUEUE) {
2584 tso = unblockOneLocked(tso);
2586 RELEASE_LOCK(&sched_mutex);
2590 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2591 //@subsection Exception Handling Routines
2593 /* ---------------------------------------------------------------------------
2595 - usually called inside a signal handler so it mustn't do anything fancy.
2596 ------------------------------------------------------------------------ */
2599 interruptStgRts(void)
2605 /* -----------------------------------------------------------------------------
2608 This is for use when we raise an exception in another thread, which
2610 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2611 -------------------------------------------------------------------------- */
2613 #if defined(GRAN) || defined(PAR)
2615 NB: only the type of the blocking queue is different in GranSim and GUM
2616 the operations on the queue-elements are the same
2617 long live polymorphism!
2620 unblockThread(StgTSO *tso)
2622 StgBlockingQueueElement *t, **last;
2624 ACQUIRE_LOCK(&sched_mutex);
2625 switch (tso->why_blocked) {
2628 return; /* not blocked */
2631 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2633 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2634 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2636 last = (StgBlockingQueueElement **)&mvar->head;
2637 for (t = (StgBlockingQueueElement *)mvar->head;
2639 last = &t->link, last_tso = t, t = t->link) {
2640 if (t == (StgBlockingQueueElement *)tso) {
2641 *last = (StgBlockingQueueElement *)tso->link;
2642 if (mvar->tail == tso) {
2643 mvar->tail = (StgTSO *)last_tso;
2648 barf("unblockThread (MVAR): TSO not found");
2651 case BlockedOnBlackHole:
2652 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2654 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2656 last = &bq->blocking_queue;
2657 for (t = bq->blocking_queue;
2659 last = &t->link, t = t->link) {
2660 if (t == (StgBlockingQueueElement *)tso) {
2661 *last = (StgBlockingQueueElement *)tso->link;
2665 barf("unblockThread (BLACKHOLE): TSO not found");
2668 case BlockedOnException:
2670 StgTSO *target = tso->block_info.tso;
2672 ASSERT(get_itbl(target)->type == TSO);
2674 if (target->what_next == ThreadRelocated) {
2675 target = target->link;
2676 ASSERT(get_itbl(target)->type == TSO);
2679 ASSERT(target->blocked_exceptions != NULL);
2681 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2682 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2684 last = &t->link, t = t->link) {
2685 ASSERT(get_itbl(t)->type == TSO);
2686 if (t == (StgBlockingQueueElement *)tso) {
2687 *last = (StgBlockingQueueElement *)tso->link;
2691 barf("unblockThread (Exception): TSO not found");
2695 case BlockedOnWrite:
2697 /* take TSO off blocked_queue */
2698 StgBlockingQueueElement *prev = NULL;
2699 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2700 prev = t, t = t->link) {
2701 if (t == (StgBlockingQueueElement *)tso) {
2703 blocked_queue_hd = (StgTSO *)t->link;
2704 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2705 blocked_queue_tl = END_TSO_QUEUE;
2708 prev->link = t->link;
2709 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2710 blocked_queue_tl = (StgTSO *)prev;
2716 barf("unblockThread (I/O): TSO not found");
2719 case BlockedOnDelay:
2721 /* take TSO off sleeping_queue */
2722 StgBlockingQueueElement *prev = NULL;
2723 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2724 prev = t, t = t->link) {
2725 if (t == (StgBlockingQueueElement *)tso) {
2727 sleeping_queue = (StgTSO *)t->link;
2729 prev->link = t->link;
2734 barf("unblockThread (I/O): TSO not found");
2738 barf("unblockThread");
2742 tso->link = END_TSO_QUEUE;
2743 tso->why_blocked = NotBlocked;
2744 tso->block_info.closure = NULL;
2745 PUSH_ON_RUN_QUEUE(tso);
2746 RELEASE_LOCK(&sched_mutex);
2750 unblockThread(StgTSO *tso)
2754 ACQUIRE_LOCK(&sched_mutex);
2755 switch (tso->why_blocked) {
2758 return; /* not blocked */
2761 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2763 StgTSO *last_tso = END_TSO_QUEUE;
2764 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2767 for (t = mvar->head; t != END_TSO_QUEUE;
2768 last = &t->link, last_tso = t, t = t->link) {
2771 if (mvar->tail == tso) {
2772 mvar->tail = last_tso;
2777 barf("unblockThread (MVAR): TSO not found");
2780 case BlockedOnBlackHole:
2781 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2783 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2785 last = &bq->blocking_queue;
2786 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2787 last = &t->link, t = t->link) {
2793 barf("unblockThread (BLACKHOLE): TSO not found");
2796 case BlockedOnException:
2798 StgTSO *target = tso->block_info.tso;
2800 ASSERT(get_itbl(target)->type == TSO);
2802 while (target->what_next == ThreadRelocated) {
2803 target = target->link;
2804 ASSERT(get_itbl(target)->type == TSO);
2807 ASSERT(target->blocked_exceptions != NULL);
2809 last = &target->blocked_exceptions;
2810 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2811 last = &t->link, t = t->link) {
2812 ASSERT(get_itbl(t)->type == TSO);
2818 barf("unblockThread (Exception): TSO not found");
2822 case BlockedOnWrite:
2824 StgTSO *prev = NULL;
2825 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2826 prev = t, t = t->link) {
2829 blocked_queue_hd = t->link;
2830 if (blocked_queue_tl == t) {
2831 blocked_queue_tl = END_TSO_QUEUE;
2834 prev->link = t->link;
2835 if (blocked_queue_tl == t) {
2836 blocked_queue_tl = prev;
2842 barf("unblockThread (I/O): TSO not found");
2845 case BlockedOnDelay:
2847 StgTSO *prev = NULL;
2848 for (t = sleeping_queue; t != END_TSO_QUEUE;
2849 prev = t, t = t->link) {
2852 sleeping_queue = t->link;
2854 prev->link = t->link;
2859 barf("unblockThread (I/O): TSO not found");
2863 barf("unblockThread");
2867 tso->link = END_TSO_QUEUE;
2868 tso->why_blocked = NotBlocked;
2869 tso->block_info.closure = NULL;
2870 PUSH_ON_RUN_QUEUE(tso);
2871 RELEASE_LOCK(&sched_mutex);
2875 /* -----------------------------------------------------------------------------
2878 * The following function implements the magic for raising an
2879 * asynchronous exception in an existing thread.
2881 * We first remove the thread from any queue on which it might be
2882 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2884 * We strip the stack down to the innermost CATCH_FRAME, building
2885 * thunks in the heap for all the active computations, so they can
2886 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2887 * an application of the handler to the exception, and push it on
2888 * the top of the stack.
2890 * How exactly do we save all the active computations? We create an
2891 * AP_UPD for every UpdateFrame on the stack. Entering one of these
2892 * AP_UPDs pushes everything from the corresponding update frame
2893 * upwards onto the stack. (Actually, it pushes everything up to the
2894 * next update frame plus a pointer to the next AP_UPD object.
2895 * Entering the next AP_UPD object pushes more onto the stack until we
2896 * reach the last AP_UPD object - at which point the stack should look
2897 * exactly as it did when we killed the TSO and we can continue
2898 * execution by entering the closure on top of the stack.
2900 * We can also kill a thread entirely - this happens if either (a) the
2901 * exception passed to raiseAsync is NULL, or (b) there's no
2902 * CATCH_FRAME on the stack. In either case, we strip the entire
2903 * stack and replace the thread with a zombie.
2905 * -------------------------------------------------------------------------- */
2908 deleteThread(StgTSO *tso)
2910 raiseAsync(tso,NULL);
2914 raiseAsync(StgTSO *tso, StgClosure *exception)
2916 StgUpdateFrame* su = tso->su;
2917 StgPtr sp = tso->sp;
2919 /* Thread already dead? */
2920 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2924 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2926 /* Remove it from any blocking queues */
2929 /* The stack freezing code assumes there's a closure pointer on
2930 * the top of the stack. This isn't always the case with compiled
2931 * code, so we have to push a dummy closure on the top which just
2932 * returns to the next return address on the stack.
2934 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2935 *(--sp) = (W_)&stg_dummy_ret_closure;
2939 nat words = ((P_)su - (P_)sp) - 1;
2943 /* If we find a CATCH_FRAME, and we've got an exception to raise,
2944 * then build PAP(handler,exception,realworld#), and leave it on
2945 * top of the stack ready to enter.
2947 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2948 StgCatchFrame *cf = (StgCatchFrame *)su;
2949 /* we've got an exception to raise, so let's pass it to the
2950 * handler in this frame.
2952 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2953 TICK_ALLOC_UPD_PAP(3,0);
2954 SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
2957 ap->fun = cf->handler; /* :: Exception -> IO a */
2958 ap->payload[0] = exception;
2959 ap->payload[1] = ARG_TAG(0); /* realworld token */
2961 /* throw away the stack from Sp up to and including the
2964 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
2967 /* Restore the blocked/unblocked state for asynchronous exceptions
2968 * at the CATCH_FRAME.
2970 * If exceptions were unblocked at the catch, arrange that they
2971 * are unblocked again after executing the handler by pushing an
2972 * unblockAsyncExceptions_ret stack frame.
2974 if (!cf->exceptions_blocked) {
2975 *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
2978 /* Ensure that async exceptions are blocked when running the handler.
2980 if (tso->blocked_exceptions == NULL) {
2981 tso->blocked_exceptions = END_TSO_QUEUE;
2984 /* Put the newly-built PAP on top of the stack, ready to execute
2985 * when the thread restarts.
2989 tso->what_next = ThreadEnterGHC;
2990 IF_DEBUG(sanity, checkTSO(tso));
2994 /* First build an AP_UPD consisting of the stack chunk above the
2995 * current update frame, with the top word on the stack as the
2998 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3003 ap->fun = (StgClosure *)sp[0];
3005 for(i=0; i < (nat)words; ++i) {
3006 ap->payload[i] = (StgClosure *)*sp++;
3009 switch (get_itbl(su)->type) {
3013 SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */);
3014 TICK_ALLOC_UP_THK(words+1,0);
3017 fprintf(stderr, "scheduler: Updating ");
3018 printPtr((P_)su->updatee);
3019 fprintf(stderr, " with ");
3020 printObj((StgClosure *)ap);
3023 /* Replace the updatee with an indirection - happily
3024 * this will also wake up any threads currently
3025 * waiting on the result.
3027 * Warning: if we're in a loop, more than one update frame on
3028 * the stack may point to the same object. Be careful not to
3029 * overwrite an IND_OLDGEN in this case, because we'll screw
3030 * up the mutable lists. To be on the safe side, don't
3031 * overwrite any kind of indirection at all. See also
3032 * threadSqueezeStack in GC.c, where we have to make a similar
3035 if (!closure_IND(su->updatee)) {
3036 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
3039 sp += sizeofW(StgUpdateFrame) -1;
3040 sp[0] = (W_)ap; /* push onto stack */
3046 StgCatchFrame *cf = (StgCatchFrame *)su;
3049 /* We want a PAP, not an AP_UPD. Fortunately, the
3050 * layout's the same.
3052 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3053 TICK_ALLOC_UPD_PAP(words+1,0);
3055 /* now build o = FUN(catch,ap,handler) */
3056 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3057 TICK_ALLOC_FUN(2,0);
3058 SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3059 o->payload[0] = (StgClosure *)ap;
3060 o->payload[1] = cf->handler;
3063 fprintf(stderr, "scheduler: Built ");
3064 printObj((StgClosure *)o);
3067 /* pop the old handler and put o on the stack */
3069 sp += sizeofW(StgCatchFrame) - 1;
3076 StgSeqFrame *sf = (StgSeqFrame *)su;
3079 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3080 TICK_ALLOC_UPD_PAP(words+1,0);
3082 /* now build o = FUN(seq,ap) */
3083 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3084 TICK_ALLOC_SE_THK(1,0);
3085 SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3086 o->payload[0] = (StgClosure *)ap;
3089 fprintf(stderr, "scheduler: Built ");
3090 printObj((StgClosure *)o);
3093 /* pop the old handler and put o on the stack */
3095 sp += sizeofW(StgSeqFrame) - 1;
3101 /* We've stripped the entire stack, the thread is now dead. */
3102 sp += sizeofW(StgStopFrame) - 1;
3103 sp[0] = (W_)exception; /* save the exception */
3104 tso->what_next = ThreadKilled;
3105 tso->su = (StgUpdateFrame *)(sp+1);
3116 /* -----------------------------------------------------------------------------
3117 resurrectThreads is called after garbage collection on the list of
3118 threads found to be garbage. Each of these threads will be woken
3119 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3120 on an MVar, or NonTermination if the thread was blocked on a Black
3122 -------------------------------------------------------------------------- */
3125 resurrectThreads( StgTSO *threads )
3129 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3130 next = tso->global_link;
3131 tso->global_link = all_threads;
3133 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3135 switch (tso->why_blocked) {
3137 case BlockedOnException:
3138 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3140 case BlockedOnBlackHole:
3141 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3144 /* This might happen if the thread was blocked on a black hole
3145 * belonging to a thread that we've just woken up (raiseAsync
3146 * can wake up threads, remember...).
3150 barf("resurrectThreads: thread blocked in a strange way");
3155 /* -----------------------------------------------------------------------------
3156 * Blackhole detection: if we reach a deadlock, test whether any
3157 * threads are blocked on themselves. Any threads which are found to
3158 * be self-blocked get sent a NonTermination exception.
3160 * This is only done in a deadlock situation in order to avoid
3161 * performance overhead in the normal case.
3162 * -------------------------------------------------------------------------- */
3165 detectBlackHoles( void )
3167 StgTSO *t = all_threads;
3168 StgUpdateFrame *frame;
3169 StgClosure *blocked_on;
3171 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3173 while (t->what_next == ThreadRelocated) {
3175 ASSERT(get_itbl(t)->type == TSO);
3178 if (t->why_blocked != BlockedOnBlackHole) {
3182 blocked_on = t->block_info.closure;
3184 for (frame = t->su; ; frame = frame->link) {
3185 switch (get_itbl(frame)->type) {
3188 if (frame->updatee == blocked_on) {
3189 /* We are blocking on one of our own computations, so
3190 * send this thread the NonTermination exception.
3193 sched_belch("thread %d is blocked on itself", t->id));
3194 raiseAsync(t, (StgClosure *)NonTermination_closure);
3215 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3216 //@subsection Debugging Routines
3218 /* -----------------------------------------------------------------------------
3219 Debugging: why is a thread blocked
3220 -------------------------------------------------------------------------- */
3225 printThreadBlockage(StgTSO *tso)
3227 switch (tso->why_blocked) {
3229 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3231 case BlockedOnWrite:
3232 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3234 case BlockedOnDelay:
3235 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3238 fprintf(stderr,"is blocked on an MVar");
3240 case BlockedOnException:
3241 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3242 tso->block_info.tso->id);
3244 case BlockedOnBlackHole:
3245 fprintf(stderr,"is blocked on a black hole");
3248 fprintf(stderr,"is not blocked");
3252 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3253 tso->block_info.closure, info_type(tso->block_info.closure));
3255 case BlockedOnGA_NoSend:
3256 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3257 tso->block_info.closure, info_type(tso->block_info.closure));
3261 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3262 tso->why_blocked, tso->id, tso);
3267 printThreadStatus(StgTSO *tso)
3269 switch (tso->what_next) {
3271 fprintf(stderr,"has been killed");
3273 case ThreadComplete:
3274 fprintf(stderr,"has completed");
3277 printThreadBlockage(tso);
3282 printAllThreads(void)
3287 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3288 ullong_format_string(TIME_ON_PROC(CurrentProc),
3289 time_string, rtsFalse/*no commas!*/);
3291 sched_belch("all threads at [%s]:", time_string);
3293 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3294 ullong_format_string(CURRENT_TIME,
3295 time_string, rtsFalse/*no commas!*/);
3297 sched_belch("all threads at [%s]:", time_string);
3299 sched_belch("all threads:");
3302 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3303 fprintf(stderr, "\tthread %d ", t->id);
3304 printThreadStatus(t);
3305 fprintf(stderr,"\n");
3310 Print a whole blocking queue attached to node (debugging only).
3315 print_bq (StgClosure *node)
3317 StgBlockingQueueElement *bqe;
3321 fprintf(stderr,"## BQ of closure %p (%s): ",
3322 node, info_type(node));
3324 /* should cover all closures that may have a blocking queue */
3325 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3326 get_itbl(node)->type == FETCH_ME_BQ ||
3327 get_itbl(node)->type == RBH ||
3328 get_itbl(node)->type == MVAR);
3330 ASSERT(node!=(StgClosure*)NULL); // sanity check
3332 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3336 Print a whole blocking queue starting with the element bqe.
3339 print_bqe (StgBlockingQueueElement *bqe)
3344 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3346 for (end = (bqe==END_BQ_QUEUE);
3347 !end; // iterate until bqe points to a CONSTR
3348 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3349 bqe = end ? END_BQ_QUEUE : bqe->link) {
3350 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3351 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3352 /* types of closures that may appear in a blocking queue */
3353 ASSERT(get_itbl(bqe)->type == TSO ||
3354 get_itbl(bqe)->type == BLOCKED_FETCH ||
3355 get_itbl(bqe)->type == CONSTR);
3356 /* only BQs of an RBH end with an RBH_Save closure */
3357 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3359 switch (get_itbl(bqe)->type) {
3361 fprintf(stderr," TSO %u (%x),",
3362 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3365 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3366 ((StgBlockedFetch *)bqe)->node,
3367 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3368 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3369 ((StgBlockedFetch *)bqe)->ga.weight);
3372 fprintf(stderr," %s (IP %p),",
3373 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3374 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3375 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3376 "RBH_Save_?"), get_itbl(bqe));
3379 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3380 info_type((StgClosure *)bqe)); // , node, info_type(node));
3384 fputc('\n', stderr);
3386 # elif defined(GRAN)
3388 print_bq (StgClosure *node)
3390 StgBlockingQueueElement *bqe;
3391 PEs node_loc, tso_loc;
3394 /* should cover all closures that may have a blocking queue */
3395 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3396 get_itbl(node)->type == FETCH_ME_BQ ||
3397 get_itbl(node)->type == RBH);
3399 ASSERT(node!=(StgClosure*)NULL); // sanity check
3400 node_loc = where_is(node);
3402 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3403 node, info_type(node), node_loc);
3406 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3408 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3409 !end; // iterate until bqe points to a CONSTR
3410 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3411 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3412 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3413 /* types of closures that may appear in a blocking queue */
3414 ASSERT(get_itbl(bqe)->type == TSO ||
3415 get_itbl(bqe)->type == CONSTR);
3416 /* only BQs of an RBH end with an RBH_Save closure */
3417 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3419 tso_loc = where_is((StgClosure *)bqe);
3420 switch (get_itbl(bqe)->type) {
3422 fprintf(stderr," TSO %d (%p) on [PE %d],",
3423 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3426 fprintf(stderr," %s (IP %p),",
3427 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3428 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3429 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3430 "RBH_Save_?"), get_itbl(bqe));
3433 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3434 info_type((StgClosure *)bqe), node, info_type(node));
3438 fputc('\n', stderr);
3442 Nice and easy: only TSOs on the blocking queue
3445 print_bq (StgClosure *node)
3449 ASSERT(node!=(StgClosure*)NULL); // sanity check
3450 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3451 tso != END_TSO_QUEUE;
3453 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3454 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3455 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3457 fputc('\n', stderr);
3468 for (i=0, tso=run_queue_hd;
3469 tso != END_TSO_QUEUE;
3478 sched_belch(char *s, ...)
3483 fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3485 fprintf(stderr, "== ");
3487 fprintf(stderr, "scheduler: ");
3489 vfprintf(stderr, s, ap);
3490 fprintf(stderr, "\n");
3496 //@node Index, , Debugging Routines, Main scheduling code
3500 //* MainRegTable:: @cindex\s-+MainRegTable
3501 //* StgMainThread:: @cindex\s-+StgMainThread
3502 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3503 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3504 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3505 //* context_switch:: @cindex\s-+context_switch
3506 //* createThread:: @cindex\s-+createThread
3507 //* free_capabilities:: @cindex\s-+free_capabilities
3508 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3509 //* initScheduler:: @cindex\s-+initScheduler
3510 //* interrupted:: @cindex\s-+interrupted
3511 //* n_free_capabilities:: @cindex\s-+n_free_capabilities
3512 //* next_thread_id:: @cindex\s-+next_thread_id
3513 //* print_bq:: @cindex\s-+print_bq
3514 //* run_queue_hd:: @cindex\s-+run_queue_hd
3515 //* run_queue_tl:: @cindex\s-+run_queue_tl
3516 //* sched_mutex:: @cindex\s-+sched_mutex
3517 //* schedule:: @cindex\s-+schedule
3518 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3519 //* task_ids:: @cindex\s-+task_ids
3520 //* term_mutex:: @cindex\s-+term_mutex
3521 //* thread_ready_cond:: @cindex\s-+thread_ready_cond