1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.102 2001/10/23 11:28:51 simonmar Exp $
4 * (c) The GHC Team, 1998-2000
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distributed memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
17 * --------------------------------------------------------------------------*/
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
23 * Version with scheduler monitor support for SMPs (WAY=s):
25 This design provides a high-level API to create and schedule threads etc.
26 as documented in the SMP design document.
28 It uses a monitor design controlled by a single mutex to exercise control
29 over accesses to shared data structures, and builds on the Posix threads
32 The majority of state is shared. In order to keep essential per-task state,
33 there is a Capability structure, which contains all the information
34 needed to run a thread: its STG registers, a pointer to its TSO, a
35 nursery etc. During STG execution, a pointer to the capability is
36 kept in a register (BaseReg).
38 In a non-SMP build, there is one global capability, namely MainRegTable.
42 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44 The main scheduling loop in GUM iterates until a finish message is received.
45 In that case a global flag @receivedFinish@ is set and this instance of
46 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47 for the handling of incoming messages, such as PP_FINISH.
48 Note that in the parallel case we have a system manager that coordinates
49 different PEs, each of which are running one instance of the RTS.
50 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55 The main scheduling code in GranSim is quite different from that in std
56 (concurrent) Haskell: while concurrent Haskell just iterates over the
57 threads in the runnable queue, GranSim is event driven, i.e. it iterates
58 over the events in the global event queue. -- HWL
63 //* Variables and Data structures::
64 //* Main scheduling loop::
65 //* Suspend and Resume::
67 //* Garbage Collextion Routines::
68 //* Blocking Queue Routines::
69 //* Exception Handling Routines::
70 //* Debugging Routines::
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
77 #include "PosixSource.h"
84 #include "StgStartup.h"
87 #include "StgMiscClosures.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
98 #if defined(GRAN) || defined(PAR)
99 # include "GranSimRts.h"
100 # include "GranSim.h"
101 # include "ParallelRts.h"
102 # include "Parallel.h"
103 # include "ParallelDebug.h"
104 # include "FetchMe.h"
111 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
112 //@subsection Variables and Data structures
116 * These are the threads which clients have requested that we run.
118 * In an SMP build, we might have several concurrent clients all
119 * waiting for results, and each one will wait on a condition variable
120 * until the result is available.
122 * In non-SMP, clients are strictly nested: the first client calls
123 * into the RTS, which might call out again to C with a _ccall_GC, and
124 * eventually re-enter the RTS.
126 * Main threads information is kept in a linked list:
128 //@cindex StgMainThread
129 typedef struct StgMainThread_ {
131 SchedulerStatus stat;
134 pthread_cond_t wakeup;
136 struct StgMainThread_ *link;
139 /* Main thread queue.
140 * Locks required: sched_mutex.
142 static StgMainThread *main_threads;
145 * Locks required: sched_mutex.
149 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
150 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
153 In GranSim we have a runable and a blocked queue for each processor.
154 In order to minimise code changes new arrays run_queue_hds/tls
155 are created. run_queue_hd is then a short cut (macro) for
156 run_queue_hds[CurrentProc] (see GranSim.h).
159 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
160 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
161 StgTSO *ccalling_threadss[MAX_PROC];
162 /* We use the same global list of threads (all_threads) in GranSim as in
163 the std RTS (i.e. we are cheating). However, we don't use this list in
164 the GranSim specific code at the moment (so we are only potentially
169 StgTSO *run_queue_hd, *run_queue_tl;
170 StgTSO *blocked_queue_hd, *blocked_queue_tl;
171 StgTSO *sleeping_queue; /* perhaps replace with a hash table? */
175 /* Linked list of all threads.
176 * Used for detecting garbage collected threads.
180 /* Threads suspended in _ccall_GC.
182 static StgTSO *suspended_ccalling_threads;
184 static void GetRoots(evac_fn);
185 static StgTSO *threadStackOverflow(StgTSO *tso);
187 /* KH: The following two flags are shared memory locations. There is no need
188 to lock them, since they are only unset at the end of a scheduler
192 /* flag set by signal handler to precipitate a context switch */
193 //@cindex context_switch
196 /* if this flag is set as well, give up execution */
197 //@cindex interrupted
200 /* Next thread ID to allocate.
201 * Locks required: sched_mutex
203 //@cindex next_thread_id
204 StgThreadID next_thread_id = 1;
207 * Pointers to the state of the current thread.
208 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
209 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
212 /* The smallest stack size that makes any sense is:
213 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
214 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
215 * + 1 (the realworld token for an IO thread)
216 * + 1 (the closure to enter)
218 * A thread with this stack will bomb immediately with a stack
219 * overflow, which will increase its stack size.
222 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
224 /* Free capability list.
225 * Locks required: sched_mutex.
228 //@cindex free_capabilities
229 //@cindex n_free_capabilities
230 Capability *free_capabilities; /* Available capabilities for running threads */
231 nat n_free_capabilities; /* total number of available capabilities */
233 //@cindex MainRegTable
234 Capability MainRegTable; /* for non-SMP, we have one global capability */
241 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
242 * exists - earlier gccs apparently didn't.
249 /* All our current task ids, saved in case we need to kill them later.
256 void addToBlockedQueue ( StgTSO *tso );
258 static void schedule ( void );
259 void interruptStgRts ( void );
261 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
263 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
266 static void detectBlackHoles ( void );
269 static void sched_belch(char *s, ...);
273 //@cindex sched_mutex
275 //@cindex thread_ready_cond
276 //@cindex gc_pending_cond
277 pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
278 pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
279 pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
280 pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
287 rtsTime TimeOfLastYield;
288 rtsBool emitSchedule = rtsTrue;
292 char *whatNext_strs[] = {
300 char *threadReturnCode_strs[] = {
301 "HeapOverflow", /* might also be StackOverflow */
310 StgTSO * createSparkThread(rtsSpark spark);
311 StgTSO * activateSpark (rtsSpark spark);
315 * The thread state for the main thread.
316 // ToDo: check whether not needed any more
320 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
321 //@subsection Main scheduling loop
323 /* ---------------------------------------------------------------------------
324 Main scheduling loop.
326 We use round-robin scheduling, each thread returning to the
327 scheduler loop when one of these conditions is detected:
330 * timer expires (thread yields)
335 Locking notes: we acquire the scheduler lock once at the beginning
336 of the scheduler loop, and release it when
338 * running a thread, or
339 * waiting for work, or
340 * waiting for a GC to complete.
343 In a GranSim setup this loop iterates over the global event queue.
344 This revolves around the global event queue, which determines what
345 to do next. Therefore, it's more complicated than either the
346 concurrent or the parallel (GUM) setup.
349 GUM iterates over incoming messages.
350 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
351 and sends out a fish whenever it has nothing to do; in-between
352 doing the actual reductions (shared code below) it processes the
353 incoming messages and deals with delayed operations
354 (see PendingFetches).
355 This is not the ugliest code you could imagine, but it's bloody close.
357 ------------------------------------------------------------------------ */
364 StgThreadReturnCode ret;
372 rtsBool receivedFinish = rtsFalse;
374 nat tp_size, sp_size; // stats only
377 rtsBool was_interrupted = rtsFalse;
379 ACQUIRE_LOCK(&sched_mutex);
383 /* set up first event to get things going */
384 /* ToDo: assign costs for system setup and init MainTSO ! */
385 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
387 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
390 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
391 G_TSO(CurrentTSO, 5));
393 if (RtsFlags.GranFlags.Light) {
394 /* Save current time; GranSim Light only */
395 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
398 event = get_next_event();
400 while (event!=(rtsEvent*)NULL) {
401 /* Choose the processor with the next event */
402 CurrentProc = event->proc;
403 CurrentTSO = event->tso;
407 while (!receivedFinish) { /* set by processMessages */
408 /* when receiving PP_FINISH message */
415 IF_DEBUG(scheduler, printAllThreads());
417 /* If we're interrupted (the user pressed ^C, or some other
418 * termination condition occurred), kill all the currently running
422 IF_DEBUG(scheduler, sched_belch("interrupted"));
424 interrupted = rtsFalse;
425 was_interrupted = rtsTrue;
428 /* Go through the list of main threads and wake up any
429 * clients whose computations have finished. ToDo: this
430 * should be done more efficiently without a linear scan
431 * of the main threads list, somehow...
435 StgMainThread *m, **prev;
436 prev = &main_threads;
437 for (m = main_threads; m != NULL; m = m->link) {
438 switch (m->tso->what_next) {
441 *(m->ret) = (StgClosure *)m->tso->sp[0];
445 pthread_cond_broadcast(&m->wakeup);
448 if (m->ret) *(m->ret) = NULL;
450 if (was_interrupted) {
451 m->stat = Interrupted;
455 pthread_cond_broadcast(&m->wakeup);
465 /* in GUM do this only on the Main PE */
468 /* If our main thread has finished or been killed, return.
471 StgMainThread *m = main_threads;
472 if (m->tso->what_next == ThreadComplete
473 || m->tso->what_next == ThreadKilled) {
474 main_threads = main_threads->link;
475 if (m->tso->what_next == ThreadComplete) {
476 /* we finished successfully, fill in the return value */
477 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
481 if (m->ret) { *(m->ret) = NULL; };
482 if (was_interrupted) {
483 m->stat = Interrupted;
493 /* Top up the run queue from our spark pool. We try to make the
494 * number of threads in the run queue equal to the number of
499 nat n = n_free_capabilities;
500 StgTSO *tso = run_queue_hd;
502 /* Count the run queue */
503 while (n > 0 && tso != END_TSO_QUEUE) {
510 spark = findSpark(rtsFalse);
512 break; /* no more sparks in the pool */
514 /* I'd prefer this to be done in activateSpark -- HWL */
515 /* tricky - it needs to hold the scheduler lock and
516 * not try to re-acquire it -- SDM */
517 createSparkThread(spark);
519 sched_belch("==^^ turning spark of closure %p into a thread",
520 (StgClosure *)spark));
523 /* We need to wake up the other tasks if we just created some
526 if (n_free_capabilities - n > 1) {
527 pthread_cond_signal(&thread_ready_cond);
532 /* Check whether any waiting threads need to be woken up. If the
533 * run queue is empty, and there are no other tasks running, we
534 * can wait indefinitely for something to happen.
535 * ToDo: what if another client comes along & requests another
538 if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
540 (run_queue_hd == END_TSO_QUEUE)
542 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
546 /* we can be interrupted while waiting for I/O... */
547 if (interrupted) continue;
549 /* check for signals each time around the scheduler */
550 #ifndef mingw32_TARGET_OS
551 if (signals_pending()) {
552 start_signal_handlers();
557 * Detect deadlock: when we have no threads to run, there are no
558 * threads waiting on I/O or sleeping, and all the other tasks are
559 * waiting for work, we must have a deadlock of some description.
561 * We first try to find threads blocked on themselves (ie. black
562 * holes), and generate NonTermination exceptions where necessary.
564 * If no threads are black holed, we have a deadlock situation, so
565 * inform all the main threads.
568 if (blocked_queue_hd == END_TSO_QUEUE
569 && run_queue_hd == END_TSO_QUEUE
570 && sleeping_queue == END_TSO_QUEUE
572 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
576 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
577 GarbageCollect(GetRoots,rtsTrue);
578 if (blocked_queue_hd == END_TSO_QUEUE
579 && run_queue_hd == END_TSO_QUEUE
580 && sleeping_queue == END_TSO_QUEUE) {
581 IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
583 if (run_queue_hd == END_TSO_QUEUE) {
584 StgMainThread *m = main_threads;
586 for (; m != NULL; m = m->link) {
587 deleteThread(m->tso);
590 pthread_cond_broadcast(&m->wakeup);
594 deleteThread(m->tso);
597 main_threads = m->link;
604 /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
608 /* If there's a GC pending, don't do anything until it has
612 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
613 pthread_cond_wait(&gc_pending_cond, &sched_mutex);
616 /* block until we've got a thread on the run queue and a free
619 while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
620 IF_DEBUG(scheduler, sched_belch("waiting for work"));
621 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
622 IF_DEBUG(scheduler, sched_belch("work now available"));
628 if (RtsFlags.GranFlags.Light)
629 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
631 /* adjust time based on time-stamp */
632 if (event->time > CurrentTime[CurrentProc] &&
633 event->evttype != ContinueThread)
634 CurrentTime[CurrentProc] = event->time;
636 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
637 if (!RtsFlags.GranFlags.Light)
640 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
642 /* main event dispatcher in GranSim */
643 switch (event->evttype) {
644 /* Should just be continuing execution */
646 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
647 /* ToDo: check assertion
648 ASSERT(run_queue_hd != (StgTSO*)NULL &&
649 run_queue_hd != END_TSO_QUEUE);
651 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
652 if (!RtsFlags.GranFlags.DoAsyncFetch &&
653 procStatus[CurrentProc]==Fetching) {
654 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
655 CurrentTSO->id, CurrentTSO, CurrentProc);
658 /* Ignore ContinueThreads for completed threads */
659 if (CurrentTSO->what_next == ThreadComplete) {
660 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
661 CurrentTSO->id, CurrentTSO, CurrentProc);
664 /* Ignore ContinueThreads for threads that are being migrated */
665 if (PROCS(CurrentTSO)==Nowhere) {
666 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
667 CurrentTSO->id, CurrentTSO, CurrentProc);
670 /* The thread should be at the beginning of the run queue */
671 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
672 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
673 CurrentTSO->id, CurrentTSO, CurrentProc);
674 break; // run the thread anyway
677 new_event(proc, proc, CurrentTime[proc],
679 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
681 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
682 break; // now actually run the thread; DaH Qu'vam yImuHbej
685 do_the_fetchnode(event);
686 goto next_thread; /* handle next event in event queue */
689 do_the_globalblock(event);
690 goto next_thread; /* handle next event in event queue */
693 do_the_fetchreply(event);
694 goto next_thread; /* handle next event in event queue */
696 case UnblockThread: /* Move from the blocked queue to the tail of */
697 do_the_unblock(event);
698 goto next_thread; /* handle next event in event queue */
700 case ResumeThread: /* Move from the blocked queue to the tail of */
701 /* the runnable queue ( i.e. Qu' SImqa'lu') */
702 event->tso->gran.blocktime +=
703 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
704 do_the_startthread(event);
705 goto next_thread; /* handle next event in event queue */
708 do_the_startthread(event);
709 goto next_thread; /* handle next event in event queue */
712 do_the_movethread(event);
713 goto next_thread; /* handle next event in event queue */
716 do_the_movespark(event);
717 goto next_thread; /* handle next event in event queue */
720 do_the_findwork(event);
721 goto next_thread; /* handle next event in event queue */
724 barf("Illegal event type %u\n", event->evttype);
727 /* This point was scheduler_loop in the old RTS */
729 IF_DEBUG(gran, belch("GRAN: after main switch"));
731 TimeOfLastEvent = CurrentTime[CurrentProc];
732 TimeOfNextEvent = get_time_of_next_event();
733 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
734 // CurrentTSO = ThreadQueueHd;
736 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
739 if (RtsFlags.GranFlags.Light)
740 GranSimLight_leave_system(event, &ActiveTSO);
742 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
745 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
747 /* in a GranSim setup the TSO stays on the run queue */
749 /* Take a thread from the run queue. */
750 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
753 fprintf(stderr, "GRAN: About to run current thread, which is\n");
756 context_switch = 0; // turned on via GranYield, checking events and time slice
759 DumpGranEvent(GR_SCHEDULE, t));
761 procStatus[CurrentProc] = Busy;
764 if (PendingFetches != END_BF_QUEUE) {
768 /* ToDo: phps merge with spark activation above */
769 /* check whether we have local work and send requests if we have none */
770 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
771 /* :-[ no local threads => look out for local sparks */
772 /* the spark pool for the current PE */
773 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
774 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
775 pool->hd < pool->tl) {
777 * ToDo: add GC code check that we really have enough heap afterwards!!
779 * If we're here (no runnable threads) and we have pending
780 * sparks, we must have a space problem. Get enough space
781 * to turn one of those pending sparks into a
785 spark = findSpark(rtsFalse); /* get a spark */
786 if (spark != (rtsSpark) NULL) {
787 tso = activateSpark(spark); /* turn the spark into a thread */
788 IF_PAR_DEBUG(schedule,
789 belch("==== schedule: Created TSO %d (%p); %d threads active",
790 tso->id, tso, advisory_thread_count));
792 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
793 belch("==^^ failed to activate spark");
795 } /* otherwise fall through & pick-up new tso */
797 IF_PAR_DEBUG(verbose,
798 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
799 spark_queue_len(pool)));
804 /* If we still have no work we need to send a FISH to get a spark
807 if (EMPTY_RUN_QUEUE()) {
808 /* =8-[ no local sparks => look for work on other PEs */
810 * We really have absolutely no work. Send out a fish
811 * (there may be some out there already), and wait for
812 * something to arrive. We clearly can't run any threads
813 * until a SCHEDULE or RESUME arrives, and so that's what
814 * we're hoping to see. (Of course, we still have to
815 * respond to other types of messages.)
817 TIME now = msTime() /*CURRENT_TIME*/;
818 IF_PAR_DEBUG(verbose,
819 belch("-- now=%ld", now));
820 IF_PAR_DEBUG(verbose,
821 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
822 (last_fish_arrived_at!=0 &&
823 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
824 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
825 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
826 last_fish_arrived_at,
827 RtsFlags.ParFlags.fishDelay, now);
830 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
831 (last_fish_arrived_at==0 ||
832 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
833 /* outstandingFishes is set in sendFish, processFish;
834 avoid flooding system with fishes via delay */
836 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
839 // Global statistics: count no. of fishes
840 if (RtsFlags.ParFlags.ParStats.Global &&
841 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
842 globalParStats.tot_fish_mess++;
846 receivedFinish = processMessages();
849 } else if (PacketsWaiting()) { /* Look for incoming messages */
850 receivedFinish = processMessages();
853 /* Now we are sure that we have some work available */
854 ASSERT(run_queue_hd != END_TSO_QUEUE);
856 /* Take a thread from the run queue, if we have work */
857 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
858 IF_DEBUG(sanity,checkTSO(t));
860 /* ToDo: write something to the log-file
861 if (RTSflags.ParFlags.granSimStats && !sameThread)
862 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
866 /* the spark pool for the current PE */
867 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
870 belch("--=^ %d threads, %d sparks on [%#x]",
871 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
874 if (0 && RtsFlags.ParFlags.ParStats.Full &&
875 t && LastTSO && t->id != LastTSO->id &&
876 LastTSO->why_blocked == NotBlocked &&
877 LastTSO->what_next != ThreadComplete) {
878 // if previously scheduled TSO not blocked we have to record the context switch
879 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
880 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
883 if (RtsFlags.ParFlags.ParStats.Full &&
884 (emitSchedule /* forced emit */ ||
885 (t && LastTSO && t->id != LastTSO->id))) {
887 we are running a different TSO, so write a schedule event to log file
888 NB: If we use fair scheduling we also have to write a deschedule
889 event for LastTSO; with unfair scheduling we know that the
890 previous tso has blocked whenever we switch to another tso, so
891 we don't need it in GUM for now
893 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
894 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
895 emitSchedule = rtsFalse;
899 #else /* !GRAN && !PAR */
901 /* grab a thread from the run queue
903 ASSERT(run_queue_hd != END_TSO_QUEUE);
905 IF_DEBUG(sanity,checkTSO(t));
912 cap = free_capabilities;
913 free_capabilities = cap->link;
914 n_free_capabilities--;
919 cap->rCurrentTSO = t;
921 /* context switches are now initiated by the timer signal, unless
922 * the user specified "context switch as often as possible", with
925 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
926 && (run_queue_hd != END_TSO_QUEUE
927 || blocked_queue_hd != END_TSO_QUEUE
928 || sleeping_queue != END_TSO_QUEUE))
933 RELEASE_LOCK(&sched_mutex);
935 IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
936 t->id, t, whatNext_strs[t->what_next]));
938 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
939 /* Run the current thread
941 switch (cap->rCurrentTSO->what_next) {
944 /* Thread already finished, return to scheduler. */
945 ret = ThreadFinished;
948 ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
951 ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
953 case ThreadEnterInterp:
954 ret = interpretBCO(cap);
957 barf("schedule: invalid what_next field");
959 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
961 /* Costs for the scheduler are assigned to CCS_SYSTEM */
966 ACQUIRE_LOCK(&sched_mutex);
969 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
970 #elif !defined(GRAN) && !defined(PAR)
971 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
973 t = cap->rCurrentTSO;
976 /* HACK 675: if the last thread didn't yield, make sure to print a
977 SCHEDULE event to the log file when StgRunning the next thread, even
978 if it is the same one as before */
980 TimeOfLastYield = CURRENT_TIME;
987 DumpGranEvent(GR_DESCHEDULE, t));
988 globalGranStats.tot_heapover++;
991 //DumpGranEvent(GR_DESCHEDULE, t);
992 globalParStats.tot_heapover++;
994 /* make all the running tasks block on a condition variable,
995 * maybe set context_switch and wait till they all pile in,
996 * then have them wait on a GC condition variable.
998 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
999 t->id, t, whatNext_strs[t->what_next]));
1002 ASSERT(!is_on_queue(t,CurrentProc));
1004 /* Currently we emit a DESCHEDULE event before GC in GUM.
1005 ToDo: either add separate event to distinguish SYSTEM time from rest
1006 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1007 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1008 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1009 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1010 emitSchedule = rtsTrue;
1014 ready_to_gc = rtsTrue;
1015 context_switch = 1; /* stop other threads ASAP */
1016 PUSH_ON_RUN_QUEUE(t);
1017 /* actual GC is done at the end of the while loop */
1023 DumpGranEvent(GR_DESCHEDULE, t));
1024 globalGranStats.tot_stackover++;
1027 // DumpGranEvent(GR_DESCHEDULE, t);
1028 globalParStats.tot_stackover++;
1030 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
1031 t->id, t, whatNext_strs[t->what_next]));
1032 /* just adjust the stack for this thread, then pop it back
1038 /* enlarge the stack */
1039 StgTSO *new_t = threadStackOverflow(t);
1041 /* This TSO has moved, so update any pointers to it from the
1042 * main thread stack. It better not be on any other queues...
1043 * (it shouldn't be).
1045 for (m = main_threads; m != NULL; m = m->link) {
1050 threadPaused(new_t);
1051 PUSH_ON_RUN_QUEUE(new_t);
1055 case ThreadYielding:
1058 DumpGranEvent(GR_DESCHEDULE, t));
1059 globalGranStats.tot_yields++;
1062 // DumpGranEvent(GR_DESCHEDULE, t);
1063 globalParStats.tot_yields++;
1065 /* put the thread back on the run queue. Then, if we're ready to
1066 * GC, check whether this is the last task to stop. If so, wake
1067 * up the GC thread. getThread will block during a GC until the
1071 if (t->what_next == ThreadEnterInterp) {
1072 /* ToDo: or maybe a timer expired when we were in Hugs?
1073 * or maybe someone hit ctrl-C
1075 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
1076 t->id, t, whatNext_strs[t->what_next]);
1078 belch("--<< thread %ld (%p; %s) stopped, yielding",
1079 t->id, t, whatNext_strs[t->what_next]);
1086 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1088 ASSERT(t->link == END_TSO_QUEUE);
1090 ASSERT(!is_on_queue(t,CurrentProc));
1093 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1094 checkThreadQsSanity(rtsTrue));
1097 if (RtsFlags.ParFlags.doFairScheduling) {
1098 /* this does round-robin scheduling; good for concurrency */
1099 APPEND_TO_RUN_QUEUE(t);
1101 /* this does unfair scheduling; good for parallelism */
1102 PUSH_ON_RUN_QUEUE(t);
1105 /* this does round-robin scheduling; good for concurrency */
1106 APPEND_TO_RUN_QUEUE(t);
1109 /* add a ContinueThread event to actually process the thread */
1110 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1112 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1114 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1123 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1124 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1125 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1127 // ??? needed; should emit block before
1129 DumpGranEvent(GR_DESCHEDULE, t));
1130 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1133 ASSERT(procStatus[CurrentProc]==Busy ||
1134 ((procStatus[CurrentProc]==Fetching) &&
1135 (t->block_info.closure!=(StgClosure*)NULL)));
1136 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1137 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1138 procStatus[CurrentProc]==Fetching))
1139 procStatus[CurrentProc] = Idle;
1143 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1144 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1147 if (t->block_info.closure!=(StgClosure*)NULL)
1148 print_bq(t->block_info.closure));
1150 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1153 /* whatever we schedule next, we must log that schedule */
1154 emitSchedule = rtsTrue;
1157 /* don't need to do anything. Either the thread is blocked on
1158 * I/O, in which case we'll have called addToBlockedQueue
1159 * previously, or it's blocked on an MVar or Blackhole, in which
1160 * case it'll be on the relevant queue already.
1163 fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1164 printThreadBlockage(t);
1165 fprintf(stderr, "\n"));
1167 /* Only for dumping event to log file
1168 ToDo: do I need this in GranSim, too?
1175 case ThreadFinished:
1176 /* Need to check whether this was a main thread, and if so, signal
1177 * the task that started it with the return value. If we have no
1178 * more main threads, we probably need to stop all the tasks until
1181 /* We also end up here if the thread kills itself with an
1182 * uncaught exception, see Exception.hc.
1184 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1186 endThread(t, CurrentProc); // clean-up the thread
1188 /* For now all are advisory -- HWL */
1189 //if(t->priority==AdvisoryPriority) ??
1190 advisory_thread_count--;
1193 if(t->dist.priority==RevalPriority)
1197 if (RtsFlags.ParFlags.ParStats.Full &&
1198 !RtsFlags.ParFlags.ParStats.Suppressed)
1199 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1204 barf("schedule: invalid thread return code %d", (int)ret);
1208 cap->link = free_capabilities;
1209 free_capabilities = cap;
1210 n_free_capabilities++;
1214 if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
1219 /* everybody back, start the GC.
1220 * Could do it in this thread, or signal a condition var
1221 * to do it in another thread. Either way, we need to
1222 * broadcast on gc_pending_cond afterward.
1225 IF_DEBUG(scheduler,sched_belch("doing GC"));
1227 GarbageCollect(GetRoots,rtsFalse);
1228 ready_to_gc = rtsFalse;
1230 pthread_cond_broadcast(&gc_pending_cond);
1233 /* add a ContinueThread event to continue execution of current thread */
1234 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1236 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1238 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1245 IF_GRAN_DEBUG(unused,
1246 print_eventq(EventHd));
1248 event = get_next_event();
1252 /* ToDo: wait for next message to arrive rather than busy wait */
1257 t = take_off_run_queue(END_TSO_QUEUE);
1260 } /* end of while(1) */
1261 IF_PAR_DEBUG(verbose,
1262 belch("== Leaving schedule() after having received Finish"));
1265 /* ---------------------------------------------------------------------------
1266 * deleteAllThreads(): kill all the live threads.
1268 * This is used when we catch a user interrupt (^C), before performing
1269 * any necessary cleanups and running finalizers.
1270 * ------------------------------------------------------------------------- */
1272 void deleteAllThreads ( void )
1275 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1276 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1279 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1282 for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1285 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1286 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1287 sleeping_queue = END_TSO_QUEUE;
1290 /* startThread and insertThread are now in GranSim.c -- HWL */
1292 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1293 //@subsection Suspend and Resume
1295 /* ---------------------------------------------------------------------------
1296 * Suspending & resuming Haskell threads.
1298 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1299 * its capability before calling the C function. This allows another
1300 * task to pick up the capability and carry on running Haskell
1301 * threads. It also means that if the C call blocks, it won't lock
1304 * The Haskell thread making the C call is put to sleep for the
1305 * duration of the call, on the susepended_ccalling_threads queue. We
1306 * give out a token to the task, which it can use to resume the thread
1307 * on return from the C function.
1308 * ------------------------------------------------------------------------- */
1311 suspendThread( Capability *cap )
1315 ACQUIRE_LOCK(&sched_mutex);
1318 sched_belch("thread %d did a _ccall_gc", cap->rCurrentTSO->id));
1320 threadPaused(cap->rCurrentTSO);
1321 cap->rCurrentTSO->link = suspended_ccalling_threads;
1322 suspended_ccalling_threads = cap->rCurrentTSO;
1324 /* Use the thread ID as the token; it should be unique */
1325 tok = cap->rCurrentTSO->id;
1328 cap->link = free_capabilities;
1329 free_capabilities = cap;
1330 n_free_capabilities++;
1333 RELEASE_LOCK(&sched_mutex);
1338 resumeThread( StgInt tok )
1340 StgTSO *tso, **prev;
1343 ACQUIRE_LOCK(&sched_mutex);
1345 prev = &suspended_ccalling_threads;
1346 for (tso = suspended_ccalling_threads;
1347 tso != END_TSO_QUEUE;
1348 prev = &tso->link, tso = tso->link) {
1349 if (tso->id == (StgThreadID)tok) {
1354 if (tso == END_TSO_QUEUE) {
1355 barf("resumeThread: thread not found");
1357 tso->link = END_TSO_QUEUE;
1360 while (free_capabilities == NULL) {
1361 IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1362 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1363 IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1365 cap = free_capabilities;
1366 free_capabilities = cap->link;
1367 n_free_capabilities--;
1369 cap = &MainRegTable;
1372 cap->rCurrentTSO = tso;
1374 RELEASE_LOCK(&sched_mutex);
1379 /* ---------------------------------------------------------------------------
1381 * ------------------------------------------------------------------------ */
1382 static void unblockThread(StgTSO *tso);
1384 /* ---------------------------------------------------------------------------
1385 * Comparing Thread ids.
1387 * This is used from STG land in the implementation of the
1388 * instances of Eq/Ord for ThreadIds.
1389 * ------------------------------------------------------------------------ */
1391 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1393 StgThreadID id1 = tso1->id;
1394 StgThreadID id2 = tso2->id;
1396 if (id1 < id2) return (-1);
1397 if (id1 > id2) return 1;
1401 /* ---------------------------------------------------------------------------
1402 Create a new thread.
1404 The new thread starts with the given stack size. Before the
1405 scheduler can run, however, this thread needs to have a closure
1406 (and possibly some arguments) pushed on its stack. See
1407 pushClosure() in Schedule.h.
1409 createGenThread() and createIOThread() (in SchedAPI.h) are
1410 convenient packaged versions of this function.
1412 currently pri (priority) is only used in a GRAN setup -- HWL
1413 ------------------------------------------------------------------------ */
1414 //@cindex createThread
1416 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1418 createThread(nat stack_size, StgInt pri)
1420 return createThread_(stack_size, rtsFalse, pri);
1424 createThread_(nat size, rtsBool have_lock, StgInt pri)
1428 createThread(nat stack_size)
1430 return createThread_(stack_size, rtsFalse);
1434 createThread_(nat size, rtsBool have_lock)
1441 /* First check whether we should create a thread at all */
1443 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1444 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1446 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1447 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1448 return END_TSO_QUEUE;
1454 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1457 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1459 /* catch ridiculously small stack sizes */
1460 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1461 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1464 stack_size = size - TSO_STRUCT_SIZEW;
1466 tso = (StgTSO *)allocate(size);
1467 TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1469 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1471 SET_GRAN_HDR(tso, ThisPE);
1473 tso->what_next = ThreadEnterGHC;
1475 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1476 * protect the increment operation on next_thread_id.
1477 * In future, we could use an atomic increment instead.
1479 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1480 tso->id = next_thread_id++;
1481 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1483 tso->why_blocked = NotBlocked;
1484 tso->blocked_exceptions = NULL;
1486 tso->stack_size = stack_size;
1487 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1489 tso->sp = (P_)&(tso->stack) + stack_size;
1492 tso->prof.CCCS = CCS_MAIN;
1495 /* put a stop frame on the stack */
1496 tso->sp -= sizeofW(StgStopFrame);
1497 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1498 tso->su = (StgUpdateFrame*)tso->sp;
1502 tso->link = END_TSO_QUEUE;
1503 /* uses more flexible routine in GranSim */
1504 insertThread(tso, CurrentProc);
1506 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1512 if (RtsFlags.GranFlags.GranSimStats.Full)
1513 DumpGranEvent(GR_START,tso);
1515 if (RtsFlags.ParFlags.ParStats.Full)
1516 DumpGranEvent(GR_STARTQ,tso);
1517 /* HACk to avoid SCHEDULE
1521 /* Link the new thread on the global thread list.
1523 tso->global_link = all_threads;
1527 tso->dist.priority = MandatoryPriority; //by default that is...
1531 tso->gran.pri = pri;
1533 tso->gran.magic = TSO_MAGIC; // debugging only
1535 tso->gran.sparkname = 0;
1536 tso->gran.startedat = CURRENT_TIME;
1537 tso->gran.exported = 0;
1538 tso->gran.basicblocks = 0;
1539 tso->gran.allocs = 0;
1540 tso->gran.exectime = 0;
1541 tso->gran.fetchtime = 0;
1542 tso->gran.fetchcount = 0;
1543 tso->gran.blocktime = 0;
1544 tso->gran.blockcount = 0;
1545 tso->gran.blockedat = 0;
1546 tso->gran.globalsparks = 0;
1547 tso->gran.localsparks = 0;
1548 if (RtsFlags.GranFlags.Light)
1549 tso->gran.clock = Now; /* local clock */
1551 tso->gran.clock = 0;
1553 IF_DEBUG(gran,printTSO(tso));
1556 tso->par.magic = TSO_MAGIC; // debugging only
1558 tso->par.sparkname = 0;
1559 tso->par.startedat = CURRENT_TIME;
1560 tso->par.exported = 0;
1561 tso->par.basicblocks = 0;
1562 tso->par.allocs = 0;
1563 tso->par.exectime = 0;
1564 tso->par.fetchtime = 0;
1565 tso->par.fetchcount = 0;
1566 tso->par.blocktime = 0;
1567 tso->par.blockcount = 0;
1568 tso->par.blockedat = 0;
1569 tso->par.globalsparks = 0;
1570 tso->par.localsparks = 0;
1574 globalGranStats.tot_threads_created++;
1575 globalGranStats.threads_created_on_PE[CurrentProc]++;
1576 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1577 globalGranStats.tot_sq_probes++;
1579 // collect parallel global statistics (currently done together with GC stats)
1580 if (RtsFlags.ParFlags.ParStats.Global &&
1581 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1582 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
1583 globalParStats.tot_threads_created++;
1589 belch("==__ schedule: Created TSO %d (%p);",
1590 CurrentProc, tso, tso->id));
1592 IF_PAR_DEBUG(verbose,
1593 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1594 tso->id, tso, advisory_thread_count));
1596 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1597 tso->id, tso->stack_size));
1604 all parallel thread creation calls should fall through the following routine.
1607 createSparkThread(rtsSpark spark)
1609 ASSERT(spark != (rtsSpark)NULL);
1610 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1612 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1613 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1614 return END_TSO_QUEUE;
1618 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1619 if (tso==END_TSO_QUEUE)
1620 barf("createSparkThread: Cannot create TSO");
1622 tso->priority = AdvisoryPriority;
1624 pushClosure(tso,spark);
1625 PUSH_ON_RUN_QUEUE(tso);
1626 advisory_thread_count++;
1633 Turn a spark into a thread.
1634 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1637 //@cindex activateSpark
1639 activateSpark (rtsSpark spark)
1643 tso = createSparkThread(spark);
1644 if (RtsFlags.ParFlags.ParStats.Full) {
1645 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1646 IF_PAR_DEBUG(verbose,
1647 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1648 (StgClosure *)spark, info_type((StgClosure *)spark)));
1650 // ToDo: fwd info on local/global spark to thread -- HWL
1651 // tso->gran.exported = spark->exported;
1652 // tso->gran.locked = !spark->global;
1653 // tso->gran.sparkname = spark->name;
1659 /* ---------------------------------------------------------------------------
1662 * scheduleThread puts a thread on the head of the runnable queue.
1663 * This will usually be done immediately after a thread is created.
1664 * The caller of scheduleThread must create the thread using e.g.
1665 * createThread and push an appropriate closure
1666 * on this thread's stack before the scheduler is invoked.
1667 * ------------------------------------------------------------------------ */
1670 scheduleThread(StgTSO *tso)
1672 if (tso==END_TSO_QUEUE){
1677 ACQUIRE_LOCK(&sched_mutex);
1679 /* Put the new thread on the head of the runnable queue. The caller
1680 * better push an appropriate closure on this thread's stack
1681 * beforehand. In the SMP case, the thread may start running as
1682 * soon as we release the scheduler lock below.
1684 PUSH_ON_RUN_QUEUE(tso);
1688 IF_DEBUG(scheduler,printTSO(tso));
1690 RELEASE_LOCK(&sched_mutex);
1693 /* ---------------------------------------------------------------------------
1696 * Start up Posix threads to run each of the scheduler tasks.
1697 * I believe the task ids are not needed in the system as defined.
1699 * ------------------------------------------------------------------------ */
1701 #if defined(PAR) || defined(SMP)
1703 taskStart(void) /* ( void *arg STG_UNUSED) */
1705 scheduleThread(END_TSO_QUEUE);
1709 /* ---------------------------------------------------------------------------
1712 * Initialise the scheduler. This resets all the queues - if the
1713 * queues contained any threads, they'll be garbage collected at the
1716 * This now calls startTasks(), so should only be called once! KH @ 25/10/99
1717 * ------------------------------------------------------------------------ */
1721 term_handler(int sig STG_UNUSED)
1724 ACQUIRE_LOCK(&term_mutex);
1726 RELEASE_LOCK(&term_mutex);
1731 //@cindex initScheduler
1738 for (i=0; i<=MAX_PROC; i++) {
1739 run_queue_hds[i] = END_TSO_QUEUE;
1740 run_queue_tls[i] = END_TSO_QUEUE;
1741 blocked_queue_hds[i] = END_TSO_QUEUE;
1742 blocked_queue_tls[i] = END_TSO_QUEUE;
1743 ccalling_threadss[i] = END_TSO_QUEUE;
1744 sleeping_queue = END_TSO_QUEUE;
1747 run_queue_hd = END_TSO_QUEUE;
1748 run_queue_tl = END_TSO_QUEUE;
1749 blocked_queue_hd = END_TSO_QUEUE;
1750 blocked_queue_tl = END_TSO_QUEUE;
1751 sleeping_queue = END_TSO_QUEUE;
1754 suspended_ccalling_threads = END_TSO_QUEUE;
1756 main_threads = NULL;
1757 all_threads = END_TSO_QUEUE;
1762 RtsFlags.ConcFlags.ctxtSwitchTicks =
1763 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1765 /* Install the SIGHUP handler */
1768 struct sigaction action,oact;
1770 action.sa_handler = term_handler;
1771 sigemptyset(&action.sa_mask);
1772 action.sa_flags = 0;
1773 if (sigaction(SIGTERM, &action, &oact) != 0) {
1774 barf("can't install TERM handler");
1780 /* Allocate N Capabilities */
1783 Capability *cap, *prev;
1786 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1787 cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1791 free_capabilities = cap;
1792 n_free_capabilities = RtsFlags.ParFlags.nNodes;
1794 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1795 n_free_capabilities););
1798 #if defined(SMP) || defined(PAR)
1811 /* make some space for saving all the thread ids */
1812 task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1813 "initScheduler:task_ids");
1815 /* and create all the threads */
1816 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1817 r = pthread_create(&tid,NULL,taskStart,NULL);
1819 barf("startTasks: Can't create new Posix thread");
1821 task_ids[i].id = tid;
1822 task_ids[i].mut_time = 0.0;
1823 task_ids[i].mut_etime = 0.0;
1824 task_ids[i].gc_time = 0.0;
1825 task_ids[i].gc_etime = 0.0;
1826 task_ids[i].elapsedtimestart = elapsedtime();
1827 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1833 exitScheduler( void )
1838 /* Don't want to use pthread_cancel, since we'd have to install
1839 * these silly exception handlers (pthread_cleanup_{push,pop}) around
1843 /* Cancel all our tasks */
1844 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1845 pthread_cancel(task_ids[i].id);
1848 /* Wait for all the tasks to terminate */
1849 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1850 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n",
1852 pthread_join(task_ids[i].id, NULL);
1856 /* Send 'em all a SIGHUP. That should shut 'em up.
1858 await_death = RtsFlags.ParFlags.nNodes;
1859 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1860 pthread_kill(task_ids[i].id,SIGTERM);
1862 while (await_death > 0) {
1868 /* -----------------------------------------------------------------------------
1869 Managing the per-task allocation areas.
1871 Each capability comes with an allocation area. These are
1872 fixed-length block lists into which allocation can be done.
1874 ToDo: no support for two-space collection at the moment???
1875 -------------------------------------------------------------------------- */
1877 /* -----------------------------------------------------------------------------
1878 * waitThread is the external interface for running a new computation
1879 * and waiting for the result.
1881 * In the non-SMP case, we create a new main thread, push it on the
1882 * main-thread stack, and invoke the scheduler to run it. The
1883 * scheduler will return when the top main thread on the stack has
1884 * completed or died, and fill in the necessary fields of the
1885 * main_thread structure.
1887 * In the SMP case, we create a main thread as before, but we then
1888 * create a new condition variable and sleep on it. When our new
1889 * main thread has completed, we'll be woken up and the status/result
1890 * will be in the main_thread struct.
1891 * -------------------------------------------------------------------------- */
1894 howManyThreadsAvail ( void )
1898 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1900 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1902 for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1908 finishAllThreads ( void )
1911 while (run_queue_hd != END_TSO_QUEUE) {
1912 waitThread ( run_queue_hd, NULL );
1914 while (blocked_queue_hd != END_TSO_QUEUE) {
1915 waitThread ( blocked_queue_hd, NULL );
1917 while (sleeping_queue != END_TSO_QUEUE) {
1918 waitThread ( blocked_queue_hd, NULL );
1921 (blocked_queue_hd != END_TSO_QUEUE ||
1922 run_queue_hd != END_TSO_QUEUE ||
1923 sleeping_queue != END_TSO_QUEUE);
1927 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1930 SchedulerStatus stat;
1932 ACQUIRE_LOCK(&sched_mutex);
1934 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1940 pthread_cond_init(&m->wakeup, NULL);
1943 m->link = main_threads;
1946 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n",
1951 pthread_cond_wait(&m->wakeup, &sched_mutex);
1952 } while (m->stat == NoStatus);
1954 /* GranSim specific init */
1955 CurrentTSO = m->tso; // the TSO to run
1956 procStatus[MainProc] = Busy; // status of main PE
1957 CurrentProc = MainProc; // PE to run it on
1962 ASSERT(m->stat != NoStatus);
1968 pthread_cond_destroy(&m->wakeup);
1971 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
1975 RELEASE_LOCK(&sched_mutex);
1980 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1981 //@subsection Run queue code
1985 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1986 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1987 implicit global variable that has to be correct when calling these
1991 /* Put the new thread on the head of the runnable queue.
1992 * The caller of createThread better push an appropriate closure
1993 * on this thread's stack before the scheduler is invoked.
1995 static /* inline */ void
1996 add_to_run_queue(tso)
1999 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2000 tso->link = run_queue_hd;
2002 if (run_queue_tl == END_TSO_QUEUE) {
2007 /* Put the new thread at the end of the runnable queue. */
2008 static /* inline */ void
2009 push_on_run_queue(tso)
2012 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2013 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2014 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2015 if (run_queue_hd == END_TSO_QUEUE) {
2018 run_queue_tl->link = tso;
2024 Should be inlined because it's used very often in schedule. The tso
2025 argument is actually only needed in GranSim, where we want to have the
2026 possibility to schedule *any* TSO on the run queue, irrespective of the
2027 actual ordering. Therefore, if tso is not the nil TSO then we traverse
2028 the run queue and dequeue the tso, adjusting the links in the queue.
2030 //@cindex take_off_run_queue
2031 static /* inline */ StgTSO*
2032 take_off_run_queue(StgTSO *tso) {
2036 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2038 if tso is specified, unlink that tso from the run_queue (doesn't have
2039 to be at the beginning of the queue); GranSim only
2041 if (tso!=END_TSO_QUEUE) {
2042 /* find tso in queue */
2043 for (t=run_queue_hd, prev=END_TSO_QUEUE;
2044 t!=END_TSO_QUEUE && t!=tso;
2048 /* now actually dequeue the tso */
2049 if (prev!=END_TSO_QUEUE) {
2050 ASSERT(run_queue_hd!=t);
2051 prev->link = t->link;
2053 /* t is at beginning of thread queue */
2054 ASSERT(run_queue_hd==t);
2055 run_queue_hd = t->link;
2057 /* t is at end of thread queue */
2058 if (t->link==END_TSO_QUEUE) {
2059 ASSERT(t==run_queue_tl);
2060 run_queue_tl = prev;
2062 ASSERT(run_queue_tl!=t);
2064 t->link = END_TSO_QUEUE;
2066 /* take tso from the beginning of the queue; std concurrent code */
2068 if (t != END_TSO_QUEUE) {
2069 run_queue_hd = t->link;
2070 t->link = END_TSO_QUEUE;
2071 if (run_queue_hd == END_TSO_QUEUE) {
2072 run_queue_tl = END_TSO_QUEUE;
2081 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2082 //@subsection Garbage Collextion Routines
2084 /* ---------------------------------------------------------------------------
2085 Where are the roots that we know about?
2087 - all the threads on the runnable queue
2088 - all the threads on the blocked queue
2089 - all the threads on the sleeping queue
2090 - all the thread currently executing a _ccall_GC
2091 - all the "main threads"
2093 ------------------------------------------------------------------------ */
2095 /* This has to be protected either by the scheduler monitor, or by the
2096 garbage collection monitor (probably the latter).
2101 GetRoots(evac_fn evac)
2108 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2109 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2110 evac((StgClosure **)&run_queue_hds[i]);
2111 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2112 evac((StgClosure **)&run_queue_tls[i]);
2114 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2115 evac((StgClosure **)&blocked_queue_hds[i]);
2116 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2117 evac((StgClosure **)&blocked_queue_tls[i]);
2118 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2119 evac((StgClosure **)&ccalling_threads[i]);
2126 if (run_queue_hd != END_TSO_QUEUE) {
2127 ASSERT(run_queue_tl != END_TSO_QUEUE);
2128 evac((StgClosure **)&run_queue_hd);
2129 evac((StgClosure **)&run_queue_tl);
2132 if (blocked_queue_hd != END_TSO_QUEUE) {
2133 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2134 evac((StgClosure **)&blocked_queue_hd);
2135 evac((StgClosure **)&blocked_queue_tl);
2138 if (sleeping_queue != END_TSO_QUEUE) {
2139 evac((StgClosure **)&sleeping_queue);
2143 for (m = main_threads; m != NULL; m = m->link) {
2144 evac((StgClosure **)&m->tso);
2146 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2147 evac((StgClosure **)&suspended_ccalling_threads);
2150 #if defined(SMP) || defined(PAR) || defined(GRAN)
2151 markSparkQueue(evac);
2155 /* -----------------------------------------------------------------------------
2158 This is the interface to the garbage collector from Haskell land.
2159 We provide this so that external C code can allocate and garbage
2160 collect when called from Haskell via _ccall_GC.
2162 It might be useful to provide an interface whereby the programmer
2163 can specify more roots (ToDo).
2165 This needs to be protected by the GC condition variable above. KH.
2166 -------------------------------------------------------------------------- */
2168 void (*extra_roots)(evac_fn);
2173 GarbageCollect(GetRoots,rtsFalse);
2177 performMajorGC(void)
2179 GarbageCollect(GetRoots,rtsTrue);
2183 AllRoots(evac_fn evac)
2185 GetRoots(evac); // the scheduler's roots
2186 extra_roots(evac); // the user's roots
2190 performGCWithRoots(void (*get_roots)(evac_fn))
2192 extra_roots = get_roots;
2193 GarbageCollect(AllRoots,rtsFalse);
2196 /* -----------------------------------------------------------------------------
2199 If the thread has reached its maximum stack size, then raise the
2200 StackOverflow exception in the offending thread. Otherwise
2201 relocate the TSO into a larger chunk of memory and adjust its stack
2203 -------------------------------------------------------------------------- */
2206 threadStackOverflow(StgTSO *tso)
2208 nat new_stack_size, new_tso_size, diff, stack_words;
2212 IF_DEBUG(sanity,checkTSO(tso));
2213 if (tso->stack_size >= tso->max_stack_size) {
2216 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2217 tso->id, tso, tso->stack_size, tso->max_stack_size);
2218 /* If we're debugging, just print out the top of the stack */
2219 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2222 /* Send this thread the StackOverflow exception */
2223 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2227 /* Try to double the current stack size. If that takes us over the
2228 * maximum stack size for this thread, then use the maximum instead.
2229 * Finally round up so the TSO ends up as a whole number of blocks.
2231 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2232 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2233 TSO_STRUCT_SIZE)/sizeof(W_);
2234 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2235 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2237 IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2239 dest = (StgTSO *)allocate(new_tso_size);
2240 TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2242 /* copy the TSO block and the old stack into the new area */
2243 memcpy(dest,tso,TSO_STRUCT_SIZE);
2244 stack_words = tso->stack + tso->stack_size - tso->sp;
2245 new_sp = (P_)dest + new_tso_size - stack_words;
2246 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2248 /* relocate the stack pointers... */
2249 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2250 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2252 dest->stack_size = new_stack_size;
2254 /* and relocate the update frame list */
2255 relocate_stack(dest, diff);
2257 /* Mark the old TSO as relocated. We have to check for relocated
2258 * TSOs in the garbage collector and any primops that deal with TSOs.
2260 * It's important to set the sp and su values to just beyond the end
2261 * of the stack, so we don't attempt to scavenge any part of the
2264 tso->what_next = ThreadRelocated;
2266 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2267 tso->su = (StgUpdateFrame *)tso->sp;
2268 tso->why_blocked = NotBlocked;
2269 dest->mut_link = NULL;
2271 IF_PAR_DEBUG(verbose,
2272 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2273 tso->id, tso, tso->stack_size);
2274 /* If we're debugging, just print out the top of the stack */
2275 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2278 IF_DEBUG(sanity,checkTSO(tso));
2280 IF_DEBUG(scheduler,printTSO(dest));
2286 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2287 //@subsection Blocking Queue Routines
2289 /* ---------------------------------------------------------------------------
2290 Wake up a queue that was blocked on some resource.
2291 ------------------------------------------------------------------------ */
2295 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2300 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2302 /* write RESUME events to log file and
2303 update blocked and fetch time (depending on type of the orig closure) */
2304 if (RtsFlags.ParFlags.ParStats.Full) {
2305 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2306 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2307 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2308 if (EMPTY_RUN_QUEUE())
2309 emitSchedule = rtsTrue;
2311 switch (get_itbl(node)->type) {
2313 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2318 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2325 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2332 static StgBlockingQueueElement *
2333 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2336 PEs node_loc, tso_loc;
2338 node_loc = where_is(node); // should be lifted out of loop
2339 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2340 tso_loc = where_is((StgClosure *)tso);
2341 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2342 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2343 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2344 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2345 // insertThread(tso, node_loc);
2346 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2348 tso, node, (rtsSpark*)NULL);
2349 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2352 } else { // TSO is remote (actually should be FMBQ)
2353 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2354 RtsFlags.GranFlags.Costs.gunblocktime +
2355 RtsFlags.GranFlags.Costs.latency;
2356 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2358 tso, node, (rtsSpark*)NULL);
2359 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2362 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2364 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2365 (node_loc==tso_loc ? "Local" : "Global"),
2366 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2367 tso->block_info.closure = NULL;
2368 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2372 static StgBlockingQueueElement *
2373 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2375 StgBlockingQueueElement *next;
2377 switch (get_itbl(bqe)->type) {
2379 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2380 /* if it's a TSO just push it onto the run_queue */
2382 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2383 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2385 unblockCount(bqe, node);
2386 /* reset blocking status after dumping event */
2387 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2391 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2393 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2394 PendingFetches = (StgBlockedFetch *)bqe;
2398 /* can ignore this case in a non-debugging setup;
2399 see comments on RBHSave closures above */
2401 /* check that the closure is an RBHSave closure */
2402 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2403 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2404 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2408 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2409 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2413 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2417 #else /* !GRAN && !PAR */
2419 unblockOneLocked(StgTSO *tso)
2423 ASSERT(get_itbl(tso)->type == TSO);
2424 ASSERT(tso->why_blocked != NotBlocked);
2425 tso->why_blocked = NotBlocked;
2427 PUSH_ON_RUN_QUEUE(tso);
2429 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2434 #if defined(GRAN) || defined(PAR)
2435 inline StgBlockingQueueElement *
2436 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2438 ACQUIRE_LOCK(&sched_mutex);
2439 bqe = unblockOneLocked(bqe, node);
2440 RELEASE_LOCK(&sched_mutex);
2445 unblockOne(StgTSO *tso)
2447 ACQUIRE_LOCK(&sched_mutex);
2448 tso = unblockOneLocked(tso);
2449 RELEASE_LOCK(&sched_mutex);
2456 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2458 StgBlockingQueueElement *bqe;
2463 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2464 node, CurrentProc, CurrentTime[CurrentProc],
2465 CurrentTSO->id, CurrentTSO));
2467 node_loc = where_is(node);
2469 ASSERT(q == END_BQ_QUEUE ||
2470 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2471 get_itbl(q)->type == CONSTR); // closure (type constructor)
2472 ASSERT(is_unique(node));
2474 /* FAKE FETCH: magically copy the node to the tso's proc;
2475 no Fetch necessary because in reality the node should not have been
2476 moved to the other PE in the first place
2478 if (CurrentProc!=node_loc) {
2480 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2481 node, node_loc, CurrentProc, CurrentTSO->id,
2482 // CurrentTSO, where_is(CurrentTSO),
2483 node->header.gran.procs));
2484 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2486 belch("## new bitmask of node %p is %#x",
2487 node, node->header.gran.procs));
2488 if (RtsFlags.GranFlags.GranSimStats.Global) {
2489 globalGranStats.tot_fake_fetches++;
2494 // ToDo: check: ASSERT(CurrentProc==node_loc);
2495 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2498 bqe points to the current element in the queue
2499 next points to the next element in the queue
2501 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2502 //tso_loc = where_is(tso);
2504 bqe = unblockOneLocked(bqe, node);
2507 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2508 the closure to make room for the anchor of the BQ */
2509 if (bqe!=END_BQ_QUEUE) {
2510 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2512 ASSERT((info_ptr==&RBH_Save_0_info) ||
2513 (info_ptr==&RBH_Save_1_info) ||
2514 (info_ptr==&RBH_Save_2_info));
2516 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2517 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2518 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2521 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2522 node, info_type(node)));
2525 /* statistics gathering */
2526 if (RtsFlags.GranFlags.GranSimStats.Global) {
2527 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2528 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2529 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2530 globalGranStats.tot_awbq++; // total no. of bqs awakened
2533 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2534 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2538 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2540 StgBlockingQueueElement *bqe;
2542 ACQUIRE_LOCK(&sched_mutex);
2544 IF_PAR_DEBUG(verbose,
2545 belch("##-_ AwBQ for node %p on [%x]: ",
2549 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2550 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2555 ASSERT(q == END_BQ_QUEUE ||
2556 get_itbl(q)->type == TSO ||
2557 get_itbl(q)->type == BLOCKED_FETCH ||
2558 get_itbl(q)->type == CONSTR);
2561 while (get_itbl(bqe)->type==TSO ||
2562 get_itbl(bqe)->type==BLOCKED_FETCH) {
2563 bqe = unblockOneLocked(bqe, node);
2565 RELEASE_LOCK(&sched_mutex);
2568 #else /* !GRAN && !PAR */
2570 awakenBlockedQueue(StgTSO *tso)
2572 ACQUIRE_LOCK(&sched_mutex);
2573 while (tso != END_TSO_QUEUE) {
2574 tso = unblockOneLocked(tso);
2576 RELEASE_LOCK(&sched_mutex);
2580 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2581 //@subsection Exception Handling Routines
2583 /* ---------------------------------------------------------------------------
2585 - usually called inside a signal handler so it mustn't do anything fancy.
2586 ------------------------------------------------------------------------ */
2589 interruptStgRts(void)
2595 /* -----------------------------------------------------------------------------
2598 This is for use when we raise an exception in another thread, which
2600 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2601 -------------------------------------------------------------------------- */
2603 #if defined(GRAN) || defined(PAR)
2605 NB: only the type of the blocking queue is different in GranSim and GUM
2606 the operations on the queue-elements are the same
2607 long live polymorphism!
2610 unblockThread(StgTSO *tso)
2612 StgBlockingQueueElement *t, **last;
2614 ACQUIRE_LOCK(&sched_mutex);
2615 switch (tso->why_blocked) {
2618 return; /* not blocked */
2621 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2623 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2624 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2626 last = (StgBlockingQueueElement **)&mvar->head;
2627 for (t = (StgBlockingQueueElement *)mvar->head;
2629 last = &t->link, last_tso = t, t = t->link) {
2630 if (t == (StgBlockingQueueElement *)tso) {
2631 *last = (StgBlockingQueueElement *)tso->link;
2632 if (mvar->tail == tso) {
2633 mvar->tail = (StgTSO *)last_tso;
2638 barf("unblockThread (MVAR): TSO not found");
2641 case BlockedOnBlackHole:
2642 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2644 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2646 last = &bq->blocking_queue;
2647 for (t = bq->blocking_queue;
2649 last = &t->link, t = t->link) {
2650 if (t == (StgBlockingQueueElement *)tso) {
2651 *last = (StgBlockingQueueElement *)tso->link;
2655 barf("unblockThread (BLACKHOLE): TSO not found");
2658 case BlockedOnException:
2660 StgTSO *target = tso->block_info.tso;
2662 ASSERT(get_itbl(target)->type == TSO);
2664 if (target->what_next == ThreadRelocated) {
2665 target = target->link;
2666 ASSERT(get_itbl(target)->type == TSO);
2669 ASSERT(target->blocked_exceptions != NULL);
2671 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2672 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2674 last = &t->link, t = t->link) {
2675 ASSERT(get_itbl(t)->type == TSO);
2676 if (t == (StgBlockingQueueElement *)tso) {
2677 *last = (StgBlockingQueueElement *)tso->link;
2681 barf("unblockThread (Exception): TSO not found");
2685 case BlockedOnWrite:
2687 /* take TSO off blocked_queue */
2688 StgBlockingQueueElement *prev = NULL;
2689 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2690 prev = t, t = t->link) {
2691 if (t == (StgBlockingQueueElement *)tso) {
2693 blocked_queue_hd = (StgTSO *)t->link;
2694 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2695 blocked_queue_tl = END_TSO_QUEUE;
2698 prev->link = t->link;
2699 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2700 blocked_queue_tl = (StgTSO *)prev;
2706 barf("unblockThread (I/O): TSO not found");
2709 case BlockedOnDelay:
2711 /* take TSO off sleeping_queue */
2712 StgBlockingQueueElement *prev = NULL;
2713 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2714 prev = t, t = t->link) {
2715 if (t == (StgBlockingQueueElement *)tso) {
2717 sleeping_queue = (StgTSO *)t->link;
2719 prev->link = t->link;
2724 barf("unblockThread (I/O): TSO not found");
2728 barf("unblockThread");
2732 tso->link = END_TSO_QUEUE;
2733 tso->why_blocked = NotBlocked;
2734 tso->block_info.closure = NULL;
2735 PUSH_ON_RUN_QUEUE(tso);
2736 RELEASE_LOCK(&sched_mutex);
2740 unblockThread(StgTSO *tso)
2744 ACQUIRE_LOCK(&sched_mutex);
2745 switch (tso->why_blocked) {
2748 return; /* not blocked */
2751 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2753 StgTSO *last_tso = END_TSO_QUEUE;
2754 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2757 for (t = mvar->head; t != END_TSO_QUEUE;
2758 last = &t->link, last_tso = t, t = t->link) {
2761 if (mvar->tail == tso) {
2762 mvar->tail = last_tso;
2767 barf("unblockThread (MVAR): TSO not found");
2770 case BlockedOnBlackHole:
2771 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2773 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2775 last = &bq->blocking_queue;
2776 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2777 last = &t->link, t = t->link) {
2783 barf("unblockThread (BLACKHOLE): TSO not found");
2786 case BlockedOnException:
2788 StgTSO *target = tso->block_info.tso;
2790 ASSERT(get_itbl(target)->type == TSO);
2792 while (target->what_next == ThreadRelocated) {
2793 target = target->link;
2794 ASSERT(get_itbl(target)->type == TSO);
2797 ASSERT(target->blocked_exceptions != NULL);
2799 last = &target->blocked_exceptions;
2800 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2801 last = &t->link, t = t->link) {
2802 ASSERT(get_itbl(t)->type == TSO);
2808 barf("unblockThread (Exception): TSO not found");
2812 case BlockedOnWrite:
2814 StgTSO *prev = NULL;
2815 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2816 prev = t, t = t->link) {
2819 blocked_queue_hd = t->link;
2820 if (blocked_queue_tl == t) {
2821 blocked_queue_tl = END_TSO_QUEUE;
2824 prev->link = t->link;
2825 if (blocked_queue_tl == t) {
2826 blocked_queue_tl = prev;
2832 barf("unblockThread (I/O): TSO not found");
2835 case BlockedOnDelay:
2837 StgTSO *prev = NULL;
2838 for (t = sleeping_queue; t != END_TSO_QUEUE;
2839 prev = t, t = t->link) {
2842 sleeping_queue = t->link;
2844 prev->link = t->link;
2849 barf("unblockThread (I/O): TSO not found");
2853 barf("unblockThread");
2857 tso->link = END_TSO_QUEUE;
2858 tso->why_blocked = NotBlocked;
2859 tso->block_info.closure = NULL;
2860 PUSH_ON_RUN_QUEUE(tso);
2861 RELEASE_LOCK(&sched_mutex);
2865 /* -----------------------------------------------------------------------------
2868 * The following function implements the magic for raising an
2869 * asynchronous exception in an existing thread.
2871 * We first remove the thread from any queue on which it might be
2872 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2874 * We strip the stack down to the innermost CATCH_FRAME, building
2875 * thunks in the heap for all the active computations, so they can
2876 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2877 * an application of the handler to the exception, and push it on
2878 * the top of the stack.
2880 * How exactly do we save all the active computations? We create an
2881 * AP_UPD for every UpdateFrame on the stack. Entering one of these
2882 * AP_UPDs pushes everything from the corresponding update frame
2883 * upwards onto the stack. (Actually, it pushes everything up to the
2884 * next update frame plus a pointer to the next AP_UPD object.
2885 * Entering the next AP_UPD object pushes more onto the stack until we
2886 * reach the last AP_UPD object - at which point the stack should look
2887 * exactly as it did when we killed the TSO and we can continue
2888 * execution by entering the closure on top of the stack.
2890 * We can also kill a thread entirely - this happens if either (a) the
2891 * exception passed to raiseAsync is NULL, or (b) there's no
2892 * CATCH_FRAME on the stack. In either case, we strip the entire
2893 * stack and replace the thread with a zombie.
2895 * -------------------------------------------------------------------------- */
2898 deleteThread(StgTSO *tso)
2900 raiseAsync(tso,NULL);
2904 raiseAsync(StgTSO *tso, StgClosure *exception)
2906 StgUpdateFrame* su = tso->su;
2907 StgPtr sp = tso->sp;
2909 /* Thread already dead? */
2910 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2914 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2916 /* Remove it from any blocking queues */
2919 /* The stack freezing code assumes there's a closure pointer on
2920 * the top of the stack. This isn't always the case with compiled
2921 * code, so we have to push a dummy closure on the top which just
2922 * returns to the next return address on the stack.
2924 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2925 *(--sp) = (W_)&stg_dummy_ret_closure;
2929 nat words = ((P_)su - (P_)sp) - 1;
2933 /* If we find a CATCH_FRAME, and we've got an exception to raise,
2934 * then build PAP(handler,exception,realworld#), and leave it on
2935 * top of the stack ready to enter.
2937 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2938 StgCatchFrame *cf = (StgCatchFrame *)su;
2939 /* we've got an exception to raise, so let's pass it to the
2940 * handler in this frame.
2942 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2943 TICK_ALLOC_UPD_PAP(3,0);
2944 SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
2947 ap->fun = cf->handler; /* :: Exception -> IO a */
2948 ap->payload[0] = exception;
2949 ap->payload[1] = ARG_TAG(0); /* realworld token */
2951 /* throw away the stack from Sp up to and including the
2954 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
2957 /* Restore the blocked/unblocked state for asynchronous exceptions
2958 * at the CATCH_FRAME.
2960 * If exceptions were unblocked at the catch, arrange that they
2961 * are unblocked again after executing the handler by pushing an
2962 * unblockAsyncExceptions_ret stack frame.
2964 if (!cf->exceptions_blocked) {
2965 *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
2968 /* Ensure that async exceptions are blocked when running the handler.
2970 if (tso->blocked_exceptions == NULL) {
2971 tso->blocked_exceptions = END_TSO_QUEUE;
2974 /* Put the newly-built PAP on top of the stack, ready to execute
2975 * when the thread restarts.
2979 tso->what_next = ThreadEnterGHC;
2980 IF_DEBUG(sanity, checkTSO(tso));
2984 /* First build an AP_UPD consisting of the stack chunk above the
2985 * current update frame, with the top word on the stack as the
2988 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2993 ap->fun = (StgClosure *)sp[0];
2995 for(i=0; i < (nat)words; ++i) {
2996 ap->payload[i] = (StgClosure *)*sp++;
2999 switch (get_itbl(su)->type) {
3003 SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */);
3004 TICK_ALLOC_UP_THK(words+1,0);
3007 fprintf(stderr, "scheduler: Updating ");
3008 printPtr((P_)su->updatee);
3009 fprintf(stderr, " with ");
3010 printObj((StgClosure *)ap);
3013 /* Replace the updatee with an indirection - happily
3014 * this will also wake up any threads currently
3015 * waiting on the result.
3017 * Warning: if we're in a loop, more than one update frame on
3018 * the stack may point to the same object. Be careful not to
3019 * overwrite an IND_OLDGEN in this case, because we'll screw
3020 * up the mutable lists. To be on the safe side, don't
3021 * overwrite any kind of indirection at all. See also
3022 * threadSqueezeStack in GC.c, where we have to make a similar
3025 if (!closure_IND(su->updatee)) {
3026 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
3029 sp += sizeofW(StgUpdateFrame) -1;
3030 sp[0] = (W_)ap; /* push onto stack */
3036 StgCatchFrame *cf = (StgCatchFrame *)su;
3039 /* We want a PAP, not an AP_UPD. Fortunately, the
3040 * layout's the same.
3042 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3043 TICK_ALLOC_UPD_PAP(words+1,0);
3045 /* now build o = FUN(catch,ap,handler) */
3046 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3047 TICK_ALLOC_FUN(2,0);
3048 SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3049 o->payload[0] = (StgClosure *)ap;
3050 o->payload[1] = cf->handler;
3053 fprintf(stderr, "scheduler: Built ");
3054 printObj((StgClosure *)o);
3057 /* pop the old handler and put o on the stack */
3059 sp += sizeofW(StgCatchFrame) - 1;
3066 StgSeqFrame *sf = (StgSeqFrame *)su;
3069 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3070 TICK_ALLOC_UPD_PAP(words+1,0);
3072 /* now build o = FUN(seq,ap) */
3073 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3074 TICK_ALLOC_SE_THK(1,0);
3075 SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3076 o->payload[0] = (StgClosure *)ap;
3079 fprintf(stderr, "scheduler: Built ");
3080 printObj((StgClosure *)o);
3083 /* pop the old handler and put o on the stack */
3085 sp += sizeofW(StgSeqFrame) - 1;
3091 /* We've stripped the entire stack, the thread is now dead. */
3092 sp += sizeofW(StgStopFrame) - 1;
3093 sp[0] = (W_)exception; /* save the exception */
3094 tso->what_next = ThreadKilled;
3095 tso->su = (StgUpdateFrame *)(sp+1);
3106 /* -----------------------------------------------------------------------------
3107 resurrectThreads is called after garbage collection on the list of
3108 threads found to be garbage. Each of these threads will be woken
3109 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3110 on an MVar, or NonTermination if the thread was blocked on a Black
3112 -------------------------------------------------------------------------- */
3115 resurrectThreads( StgTSO *threads )
3119 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3120 next = tso->global_link;
3121 tso->global_link = all_threads;
3123 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3125 switch (tso->why_blocked) {
3127 case BlockedOnException:
3128 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3130 case BlockedOnBlackHole:
3131 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3134 /* This might happen if the thread was blocked on a black hole
3135 * belonging to a thread that we've just woken up (raiseAsync
3136 * can wake up threads, remember...).
3140 barf("resurrectThreads: thread blocked in a strange way");
3145 /* -----------------------------------------------------------------------------
3146 * Blackhole detection: if we reach a deadlock, test whether any
3147 * threads are blocked on themselves. Any threads which are found to
3148 * be self-blocked get sent a NonTermination exception.
3150 * This is only done in a deadlock situation in order to avoid
3151 * performance overhead in the normal case.
3152 * -------------------------------------------------------------------------- */
3155 detectBlackHoles( void )
3157 StgTSO *t = all_threads;
3158 StgUpdateFrame *frame;
3159 StgClosure *blocked_on;
3161 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3163 while (t->what_next == ThreadRelocated) {
3165 ASSERT(get_itbl(t)->type == TSO);
3168 if (t->why_blocked != BlockedOnBlackHole) {
3172 blocked_on = t->block_info.closure;
3174 for (frame = t->su; ; frame = frame->link) {
3175 switch (get_itbl(frame)->type) {
3178 if (frame->updatee == blocked_on) {
3179 /* We are blocking on one of our own computations, so
3180 * send this thread the NonTermination exception.
3183 sched_belch("thread %d is blocked on itself", t->id));
3184 raiseAsync(t, (StgClosure *)NonTermination_closure);
3205 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3206 //@subsection Debugging Routines
3208 /* -----------------------------------------------------------------------------
3209 Debugging: why is a thread blocked
3210 -------------------------------------------------------------------------- */
3215 printThreadBlockage(StgTSO *tso)
3217 switch (tso->why_blocked) {
3219 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3221 case BlockedOnWrite:
3222 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3224 case BlockedOnDelay:
3225 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3228 fprintf(stderr,"is blocked on an MVar");
3230 case BlockedOnException:
3231 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3232 tso->block_info.tso->id);
3234 case BlockedOnBlackHole:
3235 fprintf(stderr,"is blocked on a black hole");
3238 fprintf(stderr,"is not blocked");
3242 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3243 tso->block_info.closure, info_type(tso->block_info.closure));
3245 case BlockedOnGA_NoSend:
3246 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3247 tso->block_info.closure, info_type(tso->block_info.closure));
3251 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3252 tso->why_blocked, tso->id, tso);
3257 printThreadStatus(StgTSO *tso)
3259 switch (tso->what_next) {
3261 fprintf(stderr,"has been killed");
3263 case ThreadComplete:
3264 fprintf(stderr,"has completed");
3267 printThreadBlockage(tso);
3272 printAllThreads(void)
3277 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3278 ullong_format_string(TIME_ON_PROC(CurrentProc),
3279 time_string, rtsFalse/*no commas!*/);
3281 sched_belch("all threads at [%s]:", time_string);
3283 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3284 ullong_format_string(CURRENT_TIME,
3285 time_string, rtsFalse/*no commas!*/);
3287 sched_belch("all threads at [%s]:", time_string);
3289 sched_belch("all threads:");
3292 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3293 fprintf(stderr, "\tthread %d ", t->id);
3294 printThreadStatus(t);
3295 fprintf(stderr,"\n");
3300 Print a whole blocking queue attached to node (debugging only).
3305 print_bq (StgClosure *node)
3307 StgBlockingQueueElement *bqe;
3311 fprintf(stderr,"## BQ of closure %p (%s): ",
3312 node, info_type(node));
3314 /* should cover all closures that may have a blocking queue */
3315 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3316 get_itbl(node)->type == FETCH_ME_BQ ||
3317 get_itbl(node)->type == RBH ||
3318 get_itbl(node)->type == MVAR);
3320 ASSERT(node!=(StgClosure*)NULL); // sanity check
3322 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3326 Print a whole blocking queue starting with the element bqe.
3329 print_bqe (StgBlockingQueueElement *bqe)
3334 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3336 for (end = (bqe==END_BQ_QUEUE);
3337 !end; // iterate until bqe points to a CONSTR
3338 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3339 bqe = end ? END_BQ_QUEUE : bqe->link) {
3340 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3341 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3342 /* types of closures that may appear in a blocking queue */
3343 ASSERT(get_itbl(bqe)->type == TSO ||
3344 get_itbl(bqe)->type == BLOCKED_FETCH ||
3345 get_itbl(bqe)->type == CONSTR);
3346 /* only BQs of an RBH end with an RBH_Save closure */
3347 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3349 switch (get_itbl(bqe)->type) {
3351 fprintf(stderr," TSO %u (%x),",
3352 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3355 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3356 ((StgBlockedFetch *)bqe)->node,
3357 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3358 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3359 ((StgBlockedFetch *)bqe)->ga.weight);
3362 fprintf(stderr," %s (IP %p),",
3363 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3364 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3365 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3366 "RBH_Save_?"), get_itbl(bqe));
3369 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3370 info_type((StgClosure *)bqe)); // , node, info_type(node));
3374 fputc('\n', stderr);
3376 # elif defined(GRAN)
3378 print_bq (StgClosure *node)
3380 StgBlockingQueueElement *bqe;
3381 PEs node_loc, tso_loc;
3384 /* should cover all closures that may have a blocking queue */
3385 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3386 get_itbl(node)->type == FETCH_ME_BQ ||
3387 get_itbl(node)->type == RBH);
3389 ASSERT(node!=(StgClosure*)NULL); // sanity check
3390 node_loc = where_is(node);
3392 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3393 node, info_type(node), node_loc);
3396 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3398 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3399 !end; // iterate until bqe points to a CONSTR
3400 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3401 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3402 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3403 /* types of closures that may appear in a blocking queue */
3404 ASSERT(get_itbl(bqe)->type == TSO ||
3405 get_itbl(bqe)->type == CONSTR);
3406 /* only BQs of an RBH end with an RBH_Save closure */
3407 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3409 tso_loc = where_is((StgClosure *)bqe);
3410 switch (get_itbl(bqe)->type) {
3412 fprintf(stderr," TSO %d (%p) on [PE %d],",
3413 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3416 fprintf(stderr," %s (IP %p),",
3417 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3418 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3419 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3420 "RBH_Save_?"), get_itbl(bqe));
3423 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3424 info_type((StgClosure *)bqe), node, info_type(node));
3428 fputc('\n', stderr);
3432 Nice and easy: only TSOs on the blocking queue
3435 print_bq (StgClosure *node)
3439 ASSERT(node!=(StgClosure*)NULL); // sanity check
3440 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3441 tso != END_TSO_QUEUE;
3443 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3444 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3445 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3447 fputc('\n', stderr);
3458 for (i=0, tso=run_queue_hd;
3459 tso != END_TSO_QUEUE;
3468 sched_belch(char *s, ...)
3473 fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3475 fprintf(stderr, "== ");
3477 fprintf(stderr, "scheduler: ");
3479 vfprintf(stderr, s, ap);
3480 fprintf(stderr, "\n");
3486 //@node Index, , Debugging Routines, Main scheduling code
3490 //* MainRegTable:: @cindex\s-+MainRegTable
3491 //* StgMainThread:: @cindex\s-+StgMainThread
3492 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3493 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3494 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3495 //* context_switch:: @cindex\s-+context_switch
3496 //* createThread:: @cindex\s-+createThread
3497 //* free_capabilities:: @cindex\s-+free_capabilities
3498 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3499 //* initScheduler:: @cindex\s-+initScheduler
3500 //* interrupted:: @cindex\s-+interrupted
3501 //* n_free_capabilities:: @cindex\s-+n_free_capabilities
3502 //* next_thread_id:: @cindex\s-+next_thread_id
3503 //* print_bq:: @cindex\s-+print_bq
3504 //* run_queue_hd:: @cindex\s-+run_queue_hd
3505 //* run_queue_tl:: @cindex\s-+run_queue_tl
3506 //* sched_mutex:: @cindex\s-+sched_mutex
3507 //* schedule:: @cindex\s-+schedule
3508 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3509 //* task_ids:: @cindex\s-+task_ids
3510 //* term_mutex:: @cindex\s-+term_mutex
3511 //* thread_ready_cond:: @cindex\s-+thread_ready_cond