1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.113 2002/01/24 07:50:02 sof Exp $
4 * (c) The GHC Team, 1998-2000
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distributed memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
17 * --------------------------------------------------------------------------*/
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
23 * Version with scheduler monitor support for SMPs (WAY=s):
25 This design provides a high-level API to create and schedule threads etc.
26 as documented in the SMP design document.
28 It uses a monitor design controlled by a single mutex to exercise control
29 over accesses to shared data structures, and builds on the Posix threads
32 The majority of state is shared. In order to keep essential per-task state,
33 there is a Capability structure, which contains all the information
34 needed to run a thread: its STG registers, a pointer to its TSO, a
35 nursery etc. During STG execution, a pointer to the capability is
36 kept in a register (BaseReg).
38 In a non-SMP build, there is one global capability, namely MainRegTable.
42 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44 The main scheduling loop in GUM iterates until a finish message is received.
45 In that case a global flag @receivedFinish@ is set and this instance of
46 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47 for the handling of incoming messages, such as PP_FINISH.
48 Note that in the parallel case we have a system manager that coordinates
49 different PEs, each of which are running one instance of the RTS.
50 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55 The main scheduling code in GranSim is quite different from that in std
56 (concurrent) Haskell: while concurrent Haskell just iterates over the
57 threads in the runnable queue, GranSim is event driven, i.e. it iterates
58 over the events in the global event queue. -- HWL
63 //* Variables and Data structures::
64 //* Main scheduling loop::
65 //* Suspend and Resume::
67 //* Garbage Collextion Routines::
68 //* Blocking Queue Routines::
69 //* Exception Handling Routines::
70 //* Debugging Routines::
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
77 #include "PosixSource.h"
84 #include "StgStartup.h"
87 #include "StgMiscClosures.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
99 #include "Proftimer.h"
100 #include "ProfHeap.h"
102 #if defined(GRAN) || defined(PAR)
103 # include "GranSimRts.h"
104 # include "GranSim.h"
105 # include "ParallelRts.h"
106 # include "Parallel.h"
107 # include "ParallelDebug.h"
108 # include "FetchMe.h"
115 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
116 //@subsection Variables and Data structures
120 * These are the threads which clients have requested that we run.
122 * In an SMP build, we might have several concurrent clients all
123 * waiting for results, and each one will wait on a condition variable
124 * until the result is available.
126 * In non-SMP, clients are strictly nested: the first client calls
127 * into the RTS, which might call out again to C with a _ccall_GC, and
128 * eventually re-enter the RTS.
130 * Main threads information is kept in a linked list:
132 //@cindex StgMainThread
133 typedef struct StgMainThread_ {
135 SchedulerStatus stat;
138 pthread_cond_t wakeup;
140 struct StgMainThread_ *link;
143 /* Main thread queue.
144 * Locks required: sched_mutex.
146 static StgMainThread *main_threads;
149 * Locks required: sched_mutex.
153 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
154 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
157 In GranSim we have a runable and a blocked queue for each processor.
158 In order to minimise code changes new arrays run_queue_hds/tls
159 are created. run_queue_hd is then a short cut (macro) for
160 run_queue_hds[CurrentProc] (see GranSim.h).
163 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
164 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
165 StgTSO *ccalling_threadss[MAX_PROC];
166 /* We use the same global list of threads (all_threads) in GranSim as in
167 the std RTS (i.e. we are cheating). However, we don't use this list in
168 the GranSim specific code at the moment (so we are only potentially
173 StgTSO *run_queue_hd, *run_queue_tl;
174 StgTSO *blocked_queue_hd, *blocked_queue_tl;
175 StgTSO *sleeping_queue; /* perhaps replace with a hash table? */
179 /* Linked list of all threads.
180 * Used for detecting garbage collected threads.
184 /* Threads suspended in _ccall_GC.
186 static StgTSO *suspended_ccalling_threads;
188 static StgTSO *threadStackOverflow(StgTSO *tso);
190 /* KH: The following two flags are shared memory locations. There is no need
191 to lock them, since they are only unset at the end of a scheduler
195 /* flag set by signal handler to precipitate a context switch */
196 //@cindex context_switch
199 /* if this flag is set as well, give up execution */
200 //@cindex interrupted
203 /* Next thread ID to allocate.
204 * Locks required: sched_mutex
206 //@cindex next_thread_id
207 StgThreadID next_thread_id = 1;
210 * Pointers to the state of the current thread.
211 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
212 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
215 /* The smallest stack size that makes any sense is:
216 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
217 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
218 * + 1 (the realworld token for an IO thread)
219 * + 1 (the closure to enter)
221 * A thread with this stack will bomb immediately with a stack
222 * overflow, which will increase its stack size.
225 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
227 /* Free capability list.
228 * Locks required: sched_mutex.
231 Capability *free_capabilities; /* Available capabilities for running threads */
232 nat n_free_capabilities; /* total number of available capabilities */
234 Capability MainCapability; /* for non-SMP, we have one global capability */
241 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
242 * exists - earlier gccs apparently didn't.
249 /* All our current task ids, saved in case we need to kill them later.
256 void addToBlockedQueue ( StgTSO *tso );
258 static void schedule ( void );
259 void interruptStgRts ( void );
261 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
263 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
266 static void detectBlackHoles ( void );
269 static void sched_belch(char *s, ...);
273 //@cindex sched_mutex
275 //@cindex thread_ready_cond
276 //@cindex gc_pending_cond
277 pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
278 pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
279 pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
280 pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
287 rtsTime TimeOfLastYield;
288 rtsBool emitSchedule = rtsTrue;
292 char *whatNext_strs[] = {
300 char *threadReturnCode_strs[] = {
301 "HeapOverflow", /* might also be StackOverflow */
310 StgTSO * createSparkThread(rtsSpark spark);
311 StgTSO * activateSpark (rtsSpark spark);
315 * The thread state for the main thread.
316 // ToDo: check whether not needed any more
320 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
321 //@subsection Main scheduling loop
323 /* ---------------------------------------------------------------------------
324 Main scheduling loop.
326 We use round-robin scheduling, each thread returning to the
327 scheduler loop when one of these conditions is detected:
330 * timer expires (thread yields)
335 Locking notes: we acquire the scheduler lock once at the beginning
336 of the scheduler loop, and release it when
338 * running a thread, or
339 * waiting for work, or
340 * waiting for a GC to complete.
343 In a GranSim setup this loop iterates over the global event queue.
344 This revolves around the global event queue, which determines what
345 to do next. Therefore, it's more complicated than either the
346 concurrent or the parallel (GUM) setup.
349 GUM iterates over incoming messages.
350 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
351 and sends out a fish whenever it has nothing to do; in-between
352 doing the actual reductions (shared code below) it processes the
353 incoming messages and deals with delayed operations
354 (see PendingFetches).
355 This is not the ugliest code you could imagine, but it's bloody close.
357 ------------------------------------------------------------------------ */
364 StgThreadReturnCode ret;
372 rtsBool receivedFinish = rtsFalse;
374 nat tp_size, sp_size; // stats only
377 rtsBool was_interrupted = rtsFalse;
379 ACQUIRE_LOCK(&sched_mutex);
383 /* set up first event to get things going */
384 /* ToDo: assign costs for system setup and init MainTSO ! */
385 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
387 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
390 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
391 G_TSO(CurrentTSO, 5));
393 if (RtsFlags.GranFlags.Light) {
394 /* Save current time; GranSim Light only */
395 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
398 event = get_next_event();
400 while (event!=(rtsEvent*)NULL) {
401 /* Choose the processor with the next event */
402 CurrentProc = event->proc;
403 CurrentTSO = event->tso;
407 while (!receivedFinish) { /* set by processMessages */
408 /* when receiving PP_FINISH message */
415 IF_DEBUG(scheduler, printAllThreads());
417 /* If we're interrupted (the user pressed ^C, or some other
418 * termination condition occurred), kill all the currently running
422 IF_DEBUG(scheduler, sched_belch("interrupted"));
424 interrupted = rtsFalse;
425 was_interrupted = rtsTrue;
428 /* Go through the list of main threads and wake up any
429 * clients whose computations have finished. ToDo: this
430 * should be done more efficiently without a linear scan
431 * of the main threads list, somehow...
435 StgMainThread *m, **prev;
436 prev = &main_threads;
437 for (m = main_threads; m != NULL; m = m->link) {
438 switch (m->tso->what_next) {
441 *(m->ret) = (StgClosure *)m->tso->sp[0];
445 pthread_cond_broadcast(&m->wakeup);
448 if (m->ret) *(m->ret) = NULL;
450 if (was_interrupted) {
451 m->stat = Interrupted;
455 pthread_cond_broadcast(&m->wakeup);
466 /* in GUM do this only on the Main PE */
469 /* If our main thread has finished or been killed, return.
472 StgMainThread *m = main_threads;
473 if (m->tso->what_next == ThreadComplete
474 || m->tso->what_next == ThreadKilled) {
475 main_threads = main_threads->link;
476 if (m->tso->what_next == ThreadComplete) {
477 /* we finished successfully, fill in the return value */
478 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
482 if (m->ret) { *(m->ret) = NULL; };
483 if (was_interrupted) {
484 m->stat = Interrupted;
494 /* Top up the run queue from our spark pool. We try to make the
495 * number of threads in the run queue equal to the number of
498 * Disable spark support in SMP for now, non-essential & requires
499 * a little bit of work to make it compile cleanly. -- sof 1/02.
501 #if 0 /* defined(SMP) */
503 nat n = n_free_capabilities;
504 StgTSO *tso = run_queue_hd;
506 /* Count the run queue */
507 while (n > 0 && tso != END_TSO_QUEUE) {
514 spark = findSpark(rtsFalse);
516 break; /* no more sparks in the pool */
518 /* I'd prefer this to be done in activateSpark -- HWL */
519 /* tricky - it needs to hold the scheduler lock and
520 * not try to re-acquire it -- SDM */
521 createSparkThread(spark);
523 sched_belch("==^^ turning spark of closure %p into a thread",
524 (StgClosure *)spark));
527 /* We need to wake up the other tasks if we just created some
530 if (n_free_capabilities - n > 1) {
531 pthread_cond_signal(&thread_ready_cond);
536 /* check for signals each time around the scheduler */
537 #ifndef mingw32_TARGET_OS
538 if (signals_pending()) {
539 startSignalHandlers();
543 /* Check whether any waiting threads need to be woken up. If the
544 * run queue is empty, and there are no other tasks running, we
545 * can wait indefinitely for something to happen.
546 * ToDo: what if another client comes along & requests another
549 if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
551 (run_queue_hd == END_TSO_QUEUE)
553 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
557 /* we can be interrupted while waiting for I/O... */
558 if (interrupted) continue;
561 * Detect deadlock: when we have no threads to run, there are no
562 * threads waiting on I/O or sleeping, and all the other tasks are
563 * waiting for work, we must have a deadlock of some description.
565 * We first try to find threads blocked on themselves (ie. black
566 * holes), and generate NonTermination exceptions where necessary.
568 * If no threads are black holed, we have a deadlock situation, so
569 * inform all the main threads.
572 if (blocked_queue_hd == END_TSO_QUEUE
573 && run_queue_hd == END_TSO_QUEUE
574 && sleeping_queue == END_TSO_QUEUE
576 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
580 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
581 GarbageCollect(GetRoots,rtsTrue);
582 if (blocked_queue_hd == END_TSO_QUEUE
583 && run_queue_hd == END_TSO_QUEUE
584 && sleeping_queue == END_TSO_QUEUE) {
586 IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
589 // No black holes, so probably a real deadlock. Send the
590 // current main thread the Deadlock exception (or in the SMP
591 // build, send *all* main threads the deadlock exception,
592 // since none of them can make progress).
593 if (run_queue_hd == END_TSO_QUEUE) {
596 for (m = main_threads; m != NULL; m = m->link) {
597 switch (m->tso->why_blocked) {
598 case BlockedOnBlackHole:
599 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
601 case BlockedOnException:
603 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
606 barf("deadlock: main thread blocked in a strange way");
611 switch (m->tso->why_blocked) {
612 case BlockedOnBlackHole:
613 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
615 case BlockedOnException:
617 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
620 barf("deadlock: main thread blocked in a strange way");
624 ASSERT( run_queue_hd != END_TSO_QUEUE );
628 /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
632 /* If there's a GC pending, don't do anything until it has
636 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
637 pthread_cond_wait(&gc_pending_cond, &sched_mutex);
640 /* block until we've got a thread on the run queue and a free
643 while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
644 IF_DEBUG(scheduler, sched_belch("waiting for work"));
645 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
646 IF_DEBUG(scheduler, sched_belch("work now available"));
652 if (RtsFlags.GranFlags.Light)
653 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
655 /* adjust time based on time-stamp */
656 if (event->time > CurrentTime[CurrentProc] &&
657 event->evttype != ContinueThread)
658 CurrentTime[CurrentProc] = event->time;
660 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
661 if (!RtsFlags.GranFlags.Light)
664 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
666 /* main event dispatcher in GranSim */
667 switch (event->evttype) {
668 /* Should just be continuing execution */
670 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
671 /* ToDo: check assertion
672 ASSERT(run_queue_hd != (StgTSO*)NULL &&
673 run_queue_hd != END_TSO_QUEUE);
675 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
676 if (!RtsFlags.GranFlags.DoAsyncFetch &&
677 procStatus[CurrentProc]==Fetching) {
678 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
679 CurrentTSO->id, CurrentTSO, CurrentProc);
682 /* Ignore ContinueThreads for completed threads */
683 if (CurrentTSO->what_next == ThreadComplete) {
684 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
685 CurrentTSO->id, CurrentTSO, CurrentProc);
688 /* Ignore ContinueThreads for threads that are being migrated */
689 if (PROCS(CurrentTSO)==Nowhere) {
690 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
691 CurrentTSO->id, CurrentTSO, CurrentProc);
694 /* The thread should be at the beginning of the run queue */
695 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
696 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
697 CurrentTSO->id, CurrentTSO, CurrentProc);
698 break; // run the thread anyway
701 new_event(proc, proc, CurrentTime[proc],
703 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
705 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
706 break; // now actually run the thread; DaH Qu'vam yImuHbej
709 do_the_fetchnode(event);
710 goto next_thread; /* handle next event in event queue */
713 do_the_globalblock(event);
714 goto next_thread; /* handle next event in event queue */
717 do_the_fetchreply(event);
718 goto next_thread; /* handle next event in event queue */
720 case UnblockThread: /* Move from the blocked queue to the tail of */
721 do_the_unblock(event);
722 goto next_thread; /* handle next event in event queue */
724 case ResumeThread: /* Move from the blocked queue to the tail of */
725 /* the runnable queue ( i.e. Qu' SImqa'lu') */
726 event->tso->gran.blocktime +=
727 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
728 do_the_startthread(event);
729 goto next_thread; /* handle next event in event queue */
732 do_the_startthread(event);
733 goto next_thread; /* handle next event in event queue */
736 do_the_movethread(event);
737 goto next_thread; /* handle next event in event queue */
740 do_the_movespark(event);
741 goto next_thread; /* handle next event in event queue */
744 do_the_findwork(event);
745 goto next_thread; /* handle next event in event queue */
748 barf("Illegal event type %u\n", event->evttype);
751 /* This point was scheduler_loop in the old RTS */
753 IF_DEBUG(gran, belch("GRAN: after main switch"));
755 TimeOfLastEvent = CurrentTime[CurrentProc];
756 TimeOfNextEvent = get_time_of_next_event();
757 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
758 // CurrentTSO = ThreadQueueHd;
760 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
763 if (RtsFlags.GranFlags.Light)
764 GranSimLight_leave_system(event, &ActiveTSO);
766 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
769 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
771 /* in a GranSim setup the TSO stays on the run queue */
773 /* Take a thread from the run queue. */
774 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
777 fprintf(stderr, "GRAN: About to run current thread, which is\n");
780 context_switch = 0; // turned on via GranYield, checking events and time slice
783 DumpGranEvent(GR_SCHEDULE, t));
785 procStatus[CurrentProc] = Busy;
788 if (PendingFetches != END_BF_QUEUE) {
792 /* ToDo: phps merge with spark activation above */
793 /* check whether we have local work and send requests if we have none */
794 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
795 /* :-[ no local threads => look out for local sparks */
796 /* the spark pool for the current PE */
797 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
798 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
799 pool->hd < pool->tl) {
801 * ToDo: add GC code check that we really have enough heap afterwards!!
803 * If we're here (no runnable threads) and we have pending
804 * sparks, we must have a space problem. Get enough space
805 * to turn one of those pending sparks into a
809 spark = findSpark(rtsFalse); /* get a spark */
810 if (spark != (rtsSpark) NULL) {
811 tso = activateSpark(spark); /* turn the spark into a thread */
812 IF_PAR_DEBUG(schedule,
813 belch("==== schedule: Created TSO %d (%p); %d threads active",
814 tso->id, tso, advisory_thread_count));
816 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
817 belch("==^^ failed to activate spark");
819 } /* otherwise fall through & pick-up new tso */
821 IF_PAR_DEBUG(verbose,
822 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
823 spark_queue_len(pool)));
828 /* If we still have no work we need to send a FISH to get a spark
831 if (EMPTY_RUN_QUEUE()) {
832 /* =8-[ no local sparks => look for work on other PEs */
834 * We really have absolutely no work. Send out a fish
835 * (there may be some out there already), and wait for
836 * something to arrive. We clearly can't run any threads
837 * until a SCHEDULE or RESUME arrives, and so that's what
838 * we're hoping to see. (Of course, we still have to
839 * respond to other types of messages.)
841 TIME now = msTime() /*CURRENT_TIME*/;
842 IF_PAR_DEBUG(verbose,
843 belch("-- now=%ld", now));
844 IF_PAR_DEBUG(verbose,
845 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
846 (last_fish_arrived_at!=0 &&
847 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
848 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
849 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
850 last_fish_arrived_at,
851 RtsFlags.ParFlags.fishDelay, now);
854 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
855 (last_fish_arrived_at==0 ||
856 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
857 /* outstandingFishes is set in sendFish, processFish;
858 avoid flooding system with fishes via delay */
860 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
863 // Global statistics: count no. of fishes
864 if (RtsFlags.ParFlags.ParStats.Global &&
865 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
866 globalParStats.tot_fish_mess++;
870 receivedFinish = processMessages();
873 } else if (PacketsWaiting()) { /* Look for incoming messages */
874 receivedFinish = processMessages();
877 /* Now we are sure that we have some work available */
878 ASSERT(run_queue_hd != END_TSO_QUEUE);
880 /* Take a thread from the run queue, if we have work */
881 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
882 IF_DEBUG(sanity,checkTSO(t));
884 /* ToDo: write something to the log-file
885 if (RTSflags.ParFlags.granSimStats && !sameThread)
886 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
890 /* the spark pool for the current PE */
891 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
894 belch("--=^ %d threads, %d sparks on [%#x]",
895 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
898 if (0 && RtsFlags.ParFlags.ParStats.Full &&
899 t && LastTSO && t->id != LastTSO->id &&
900 LastTSO->why_blocked == NotBlocked &&
901 LastTSO->what_next != ThreadComplete) {
902 // if previously scheduled TSO not blocked we have to record the context switch
903 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
904 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
907 if (RtsFlags.ParFlags.ParStats.Full &&
908 (emitSchedule /* forced emit */ ||
909 (t && LastTSO && t->id != LastTSO->id))) {
911 we are running a different TSO, so write a schedule event to log file
912 NB: If we use fair scheduling we also have to write a deschedule
913 event for LastTSO; with unfair scheduling we know that the
914 previous tso has blocked whenever we switch to another tso, so
915 we don't need it in GUM for now
917 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
918 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
919 emitSchedule = rtsFalse;
923 #else /* !GRAN && !PAR */
925 /* grab a thread from the run queue
927 ASSERT(run_queue_hd != END_TSO_QUEUE);
930 // Sanity check the thread we're about to run. This can be
931 // expensive if there is lots of thread switching going on...
932 IF_DEBUG(sanity,checkTSO(t));
939 cap = free_capabilities;
940 free_capabilities = cap->link;
941 n_free_capabilities--;
943 cap = &MainCapability;
946 cap->r.rCurrentTSO = t;
948 /* context switches are now initiated by the timer signal, unless
949 * the user specified "context switch as often as possible", with
954 RtsFlags.ProfFlags.profileInterval == 0 ||
956 (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
957 && (run_queue_hd != END_TSO_QUEUE
958 || blocked_queue_hd != END_TSO_QUEUE
959 || sleeping_queue != END_TSO_QUEUE)))
964 RELEASE_LOCK(&sched_mutex);
966 IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
967 t->id, t, whatNext_strs[t->what_next]));
970 startHeapProfTimer();
973 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
974 /* Run the current thread
976 switch (cap->r.rCurrentTSO->what_next) {
979 /* Thread already finished, return to scheduler. */
980 ret = ThreadFinished;
983 ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
986 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
988 case ThreadEnterInterp:
989 ret = interpretBCO(cap);
992 barf("schedule: invalid what_next field");
994 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
996 /* Costs for the scheduler are assigned to CCS_SYSTEM */
1002 ACQUIRE_LOCK(&sched_mutex);
1005 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
1006 #elif !defined(GRAN) && !defined(PAR)
1007 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1009 t = cap->r.rCurrentTSO;
1012 /* HACK 675: if the last thread didn't yield, make sure to print a
1013 SCHEDULE event to the log file when StgRunning the next thread, even
1014 if it is the same one as before */
1016 TimeOfLastYield = CURRENT_TIME;
1022 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1023 globalGranStats.tot_heapover++;
1025 globalParStats.tot_heapover++;
1028 // did the task ask for a large block?
1029 if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1030 // if so, get one and push it on the front of the nursery.
1034 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1036 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)",
1038 whatNext_strs[t->what_next], blocks));
1040 // don't do this if it would push us over the
1041 // alloc_blocks_lim limit; we'll GC first.
1042 if (alloc_blocks + blocks < alloc_blocks_lim) {
1044 alloc_blocks += blocks;
1045 bd = allocGroup( blocks );
1047 // link the new group into the list
1048 bd->link = cap->r.rCurrentNursery;
1049 bd->u.back = cap->r.rCurrentNursery->u.back;
1050 if (cap->r.rCurrentNursery->u.back != NULL) {
1051 cap->r.rCurrentNursery->u.back->link = bd;
1053 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1054 g0s0->blocks == cap->r.rNursery);
1055 cap->r.rNursery = g0s0->blocks = bd;
1057 cap->r.rCurrentNursery->u.back = bd;
1059 // initialise it as a nursery block
1063 bd->free = bd->start;
1065 // don't forget to update the block count in g0s0.
1066 g0s0->n_blocks += blocks;
1067 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1069 // now update the nursery to point to the new block
1070 cap->r.rCurrentNursery = bd;
1072 // we might be unlucky and have another thread get on the
1073 // run queue before us and steal the large block, but in that
1074 // case the thread will just end up requesting another large
1076 PUSH_ON_RUN_QUEUE(t);
1081 /* make all the running tasks block on a condition variable,
1082 * maybe set context_switch and wait till they all pile in,
1083 * then have them wait on a GC condition variable.
1085 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
1086 t->id, t, whatNext_strs[t->what_next]));
1089 ASSERT(!is_on_queue(t,CurrentProc));
1091 /* Currently we emit a DESCHEDULE event before GC in GUM.
1092 ToDo: either add separate event to distinguish SYSTEM time from rest
1093 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1094 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1095 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1096 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1097 emitSchedule = rtsTrue;
1101 ready_to_gc = rtsTrue;
1102 context_switch = 1; /* stop other threads ASAP */
1103 PUSH_ON_RUN_QUEUE(t);
1104 /* actual GC is done at the end of the while loop */
1110 DumpGranEvent(GR_DESCHEDULE, t));
1111 globalGranStats.tot_stackover++;
1114 // DumpGranEvent(GR_DESCHEDULE, t);
1115 globalParStats.tot_stackover++;
1117 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
1118 t->id, t, whatNext_strs[t->what_next]));
1119 /* just adjust the stack for this thread, then pop it back
1125 /* enlarge the stack */
1126 StgTSO *new_t = threadStackOverflow(t);
1128 /* This TSO has moved, so update any pointers to it from the
1129 * main thread stack. It better not be on any other queues...
1130 * (it shouldn't be).
1132 for (m = main_threads; m != NULL; m = m->link) {
1137 threadPaused(new_t);
1138 PUSH_ON_RUN_QUEUE(new_t);
1142 case ThreadYielding:
1145 DumpGranEvent(GR_DESCHEDULE, t));
1146 globalGranStats.tot_yields++;
1149 // DumpGranEvent(GR_DESCHEDULE, t);
1150 globalParStats.tot_yields++;
1152 /* put the thread back on the run queue. Then, if we're ready to
1153 * GC, check whether this is the last task to stop. If so, wake
1154 * up the GC thread. getThread will block during a GC until the
1158 if (t->what_next == ThreadEnterInterp) {
1159 /* ToDo: or maybe a timer expired when we were in Hugs?
1160 * or maybe someone hit ctrl-C
1162 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
1163 t->id, t, whatNext_strs[t->what_next]);
1165 belch("--<< thread %ld (%p; %s) stopped, yielding",
1166 t->id, t, whatNext_strs[t->what_next]);
1173 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1175 ASSERT(t->link == END_TSO_QUEUE);
1177 ASSERT(!is_on_queue(t,CurrentProc));
1180 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1181 checkThreadQsSanity(rtsTrue));
1184 if (RtsFlags.ParFlags.doFairScheduling) {
1185 /* this does round-robin scheduling; good for concurrency */
1186 APPEND_TO_RUN_QUEUE(t);
1188 /* this does unfair scheduling; good for parallelism */
1189 PUSH_ON_RUN_QUEUE(t);
1192 /* this does round-robin scheduling; good for concurrency */
1193 APPEND_TO_RUN_QUEUE(t);
1196 /* add a ContinueThread event to actually process the thread */
1197 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1199 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1201 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1210 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1211 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1212 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1214 // ??? needed; should emit block before
1216 DumpGranEvent(GR_DESCHEDULE, t));
1217 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1220 ASSERT(procStatus[CurrentProc]==Busy ||
1221 ((procStatus[CurrentProc]==Fetching) &&
1222 (t->block_info.closure!=(StgClosure*)NULL)));
1223 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1224 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1225 procStatus[CurrentProc]==Fetching))
1226 procStatus[CurrentProc] = Idle;
1230 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1231 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1234 if (t->block_info.closure!=(StgClosure*)NULL)
1235 print_bq(t->block_info.closure));
1237 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1240 /* whatever we schedule next, we must log that schedule */
1241 emitSchedule = rtsTrue;
1244 /* don't need to do anything. Either the thread is blocked on
1245 * I/O, in which case we'll have called addToBlockedQueue
1246 * previously, or it's blocked on an MVar or Blackhole, in which
1247 * case it'll be on the relevant queue already.
1250 fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1251 printThreadBlockage(t);
1252 fprintf(stderr, "\n"));
1254 /* Only for dumping event to log file
1255 ToDo: do I need this in GranSim, too?
1262 case ThreadFinished:
1263 /* Need to check whether this was a main thread, and if so, signal
1264 * the task that started it with the return value. If we have no
1265 * more main threads, we probably need to stop all the tasks until
1268 /* We also end up here if the thread kills itself with an
1269 * uncaught exception, see Exception.hc.
1271 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1273 endThread(t, CurrentProc); // clean-up the thread
1275 /* For now all are advisory -- HWL */
1276 //if(t->priority==AdvisoryPriority) ??
1277 advisory_thread_count--;
1280 if(t->dist.priority==RevalPriority)
1284 if (RtsFlags.ParFlags.ParStats.Full &&
1285 !RtsFlags.ParFlags.ParStats.Suppressed)
1286 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1291 barf("schedule: invalid thread return code %d", (int)ret);
1295 cap->link = free_capabilities;
1296 free_capabilities = cap;
1297 n_free_capabilities++;
1301 if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1302 GarbageCollect(GetRoots, rtsTrue);
1304 performHeapProfile = rtsFalse;
1305 ready_to_gc = rtsFalse; // we already GC'd
1310 if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
1315 /* everybody back, start the GC.
1316 * Could do it in this thread, or signal a condition var
1317 * to do it in another thread. Either way, we need to
1318 * broadcast on gc_pending_cond afterward.
1321 IF_DEBUG(scheduler,sched_belch("doing GC"));
1323 GarbageCollect(GetRoots,rtsFalse);
1324 ready_to_gc = rtsFalse;
1326 pthread_cond_broadcast(&gc_pending_cond);
1329 /* add a ContinueThread event to continue execution of current thread */
1330 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1332 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1334 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1342 IF_GRAN_DEBUG(unused,
1343 print_eventq(EventHd));
1345 event = get_next_event();
1348 /* ToDo: wait for next message to arrive rather than busy wait */
1351 } /* end of while(1) */
1353 IF_PAR_DEBUG(verbose,
1354 belch("== Leaving schedule() after having received Finish"));
1357 /* ---------------------------------------------------------------------------
1358 * deleteAllThreads(): kill all the live threads.
1360 * This is used when we catch a user interrupt (^C), before performing
1361 * any necessary cleanups and running finalizers.
1362 * ------------------------------------------------------------------------- */
1364 void deleteAllThreads ( void )
1367 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1368 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1371 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1374 for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1377 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1378 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1379 sleeping_queue = END_TSO_QUEUE;
1382 /* startThread and insertThread are now in GranSim.c -- HWL */
1384 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1385 //@subsection Suspend and Resume
1387 /* ---------------------------------------------------------------------------
1388 * Suspending & resuming Haskell threads.
1390 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1391 * its capability before calling the C function. This allows another
1392 * task to pick up the capability and carry on running Haskell
1393 * threads. It also means that if the C call blocks, it won't lock
1396 * The Haskell thread making the C call is put to sleep for the
1397 * duration of the call, on the susepended_ccalling_threads queue. We
1398 * give out a token to the task, which it can use to resume the thread
1399 * on return from the C function.
1400 * ------------------------------------------------------------------------- */
1403 suspendThread( StgRegTable *reg )
1408 // assume that *reg is a pointer to the StgRegTable part of a Capability
1409 cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1411 ACQUIRE_LOCK(&sched_mutex);
1414 sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1416 threadPaused(cap->r.rCurrentTSO);
1417 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1418 suspended_ccalling_threads = cap->r.rCurrentTSO;
1420 /* Use the thread ID as the token; it should be unique */
1421 tok = cap->r.rCurrentTSO->id;
1424 cap->link = free_capabilities;
1425 free_capabilities = cap;
1426 n_free_capabilities++;
1429 RELEASE_LOCK(&sched_mutex);
1434 resumeThread( StgInt tok )
1436 StgTSO *tso, **prev;
1439 ACQUIRE_LOCK(&sched_mutex);
1441 prev = &suspended_ccalling_threads;
1442 for (tso = suspended_ccalling_threads;
1443 tso != END_TSO_QUEUE;
1444 prev = &tso->link, tso = tso->link) {
1445 if (tso->id == (StgThreadID)tok) {
1450 if (tso == END_TSO_QUEUE) {
1451 barf("resumeThread: thread not found");
1453 tso->link = END_TSO_QUEUE;
1456 while (free_capabilities == NULL) {
1457 IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1458 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1459 IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1461 cap = free_capabilities;
1462 free_capabilities = cap->link;
1463 n_free_capabilities--;
1465 cap = &MainCapability;
1468 cap->r.rCurrentTSO = tso;
1470 RELEASE_LOCK(&sched_mutex);
1475 /* ---------------------------------------------------------------------------
1477 * ------------------------------------------------------------------------ */
1478 static void unblockThread(StgTSO *tso);
1480 /* ---------------------------------------------------------------------------
1481 * Comparing Thread ids.
1483 * This is used from STG land in the implementation of the
1484 * instances of Eq/Ord for ThreadIds.
1485 * ------------------------------------------------------------------------ */
1487 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1489 StgThreadID id1 = tso1->id;
1490 StgThreadID id2 = tso2->id;
1492 if (id1 < id2) return (-1);
1493 if (id1 > id2) return 1;
1497 /* ---------------------------------------------------------------------------
1498 * Fetching the ThreadID from an StgTSO.
1500 * This is used in the implementation of Show for ThreadIds.
1501 * ------------------------------------------------------------------------ */
1502 int rts_getThreadId(const StgTSO *tso)
1507 /* ---------------------------------------------------------------------------
1508 Create a new thread.
1510 The new thread starts with the given stack size. Before the
1511 scheduler can run, however, this thread needs to have a closure
1512 (and possibly some arguments) pushed on its stack. See
1513 pushClosure() in Schedule.h.
1515 createGenThread() and createIOThread() (in SchedAPI.h) are
1516 convenient packaged versions of this function.
1518 currently pri (priority) is only used in a GRAN setup -- HWL
1519 ------------------------------------------------------------------------ */
1520 //@cindex createThread
1522 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1524 createThread(nat stack_size, StgInt pri)
1526 return createThread_(stack_size, rtsFalse, pri);
1530 createThread_(nat size, rtsBool have_lock, StgInt pri)
1534 createThread(nat stack_size)
1536 return createThread_(stack_size, rtsFalse);
1540 createThread_(nat size, rtsBool have_lock)
1547 /* First check whether we should create a thread at all */
1549 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1550 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1552 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1553 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1554 return END_TSO_QUEUE;
1560 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1563 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1565 /* catch ridiculously small stack sizes */
1566 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1567 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1570 stack_size = size - TSO_STRUCT_SIZEW;
1572 tso = (StgTSO *)allocate(size);
1573 TICK_ALLOC_TSO(stack_size, 0);
1575 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1577 SET_GRAN_HDR(tso, ThisPE);
1579 tso->what_next = ThreadEnterGHC;
1581 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1582 * protect the increment operation on next_thread_id.
1583 * In future, we could use an atomic increment instead.
1585 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1586 tso->id = next_thread_id++;
1587 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1589 tso->why_blocked = NotBlocked;
1590 tso->blocked_exceptions = NULL;
1592 tso->stack_size = stack_size;
1593 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1595 tso->sp = (P_)&(tso->stack) + stack_size;
1598 tso->prof.CCCS = CCS_MAIN;
1601 /* put a stop frame on the stack */
1602 tso->sp -= sizeofW(StgStopFrame);
1603 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1604 tso->su = (StgUpdateFrame*)tso->sp;
1608 tso->link = END_TSO_QUEUE;
1609 /* uses more flexible routine in GranSim */
1610 insertThread(tso, CurrentProc);
1612 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1618 if (RtsFlags.GranFlags.GranSimStats.Full)
1619 DumpGranEvent(GR_START,tso);
1621 if (RtsFlags.ParFlags.ParStats.Full)
1622 DumpGranEvent(GR_STARTQ,tso);
1623 /* HACk to avoid SCHEDULE
1627 /* Link the new thread on the global thread list.
1629 tso->global_link = all_threads;
1633 tso->dist.priority = MandatoryPriority; //by default that is...
1637 tso->gran.pri = pri;
1639 tso->gran.magic = TSO_MAGIC; // debugging only
1641 tso->gran.sparkname = 0;
1642 tso->gran.startedat = CURRENT_TIME;
1643 tso->gran.exported = 0;
1644 tso->gran.basicblocks = 0;
1645 tso->gran.allocs = 0;
1646 tso->gran.exectime = 0;
1647 tso->gran.fetchtime = 0;
1648 tso->gran.fetchcount = 0;
1649 tso->gran.blocktime = 0;
1650 tso->gran.blockcount = 0;
1651 tso->gran.blockedat = 0;
1652 tso->gran.globalsparks = 0;
1653 tso->gran.localsparks = 0;
1654 if (RtsFlags.GranFlags.Light)
1655 tso->gran.clock = Now; /* local clock */
1657 tso->gran.clock = 0;
1659 IF_DEBUG(gran,printTSO(tso));
1662 tso->par.magic = TSO_MAGIC; // debugging only
1664 tso->par.sparkname = 0;
1665 tso->par.startedat = CURRENT_TIME;
1666 tso->par.exported = 0;
1667 tso->par.basicblocks = 0;
1668 tso->par.allocs = 0;
1669 tso->par.exectime = 0;
1670 tso->par.fetchtime = 0;
1671 tso->par.fetchcount = 0;
1672 tso->par.blocktime = 0;
1673 tso->par.blockcount = 0;
1674 tso->par.blockedat = 0;
1675 tso->par.globalsparks = 0;
1676 tso->par.localsparks = 0;
1680 globalGranStats.tot_threads_created++;
1681 globalGranStats.threads_created_on_PE[CurrentProc]++;
1682 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1683 globalGranStats.tot_sq_probes++;
1685 // collect parallel global statistics (currently done together with GC stats)
1686 if (RtsFlags.ParFlags.ParStats.Global &&
1687 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1688 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
1689 globalParStats.tot_threads_created++;
1695 belch("==__ schedule: Created TSO %d (%p);",
1696 CurrentProc, tso, tso->id));
1698 IF_PAR_DEBUG(verbose,
1699 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1700 tso->id, tso, advisory_thread_count));
1702 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1703 tso->id, tso->stack_size));
1710 all parallel thread creation calls should fall through the following routine.
1713 createSparkThread(rtsSpark spark)
1715 ASSERT(spark != (rtsSpark)NULL);
1716 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1718 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1719 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1720 return END_TSO_QUEUE;
1724 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1725 if (tso==END_TSO_QUEUE)
1726 barf("createSparkThread: Cannot create TSO");
1728 tso->priority = AdvisoryPriority;
1730 pushClosure(tso,spark);
1731 PUSH_ON_RUN_QUEUE(tso);
1732 advisory_thread_count++;
1739 Turn a spark into a thread.
1740 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1743 //@cindex activateSpark
1745 activateSpark (rtsSpark spark)
1749 tso = createSparkThread(spark);
1750 if (RtsFlags.ParFlags.ParStats.Full) {
1751 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1752 IF_PAR_DEBUG(verbose,
1753 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1754 (StgClosure *)spark, info_type((StgClosure *)spark)));
1756 // ToDo: fwd info on local/global spark to thread -- HWL
1757 // tso->gran.exported = spark->exported;
1758 // tso->gran.locked = !spark->global;
1759 // tso->gran.sparkname = spark->name;
1765 /* ---------------------------------------------------------------------------
1768 * scheduleThread puts a thread on the head of the runnable queue.
1769 * This will usually be done immediately after a thread is created.
1770 * The caller of scheduleThread must create the thread using e.g.
1771 * createThread and push an appropriate closure
1772 * on this thread's stack before the scheduler is invoked.
1773 * ------------------------------------------------------------------------ */
1776 scheduleThread(StgTSO *tso)
1778 ACQUIRE_LOCK(&sched_mutex);
1780 /* Put the new thread on the head of the runnable queue. The caller
1781 * better push an appropriate closure on this thread's stack
1782 * beforehand. In the SMP case, the thread may start running as
1783 * soon as we release the scheduler lock below.
1785 PUSH_ON_RUN_QUEUE(tso);
1789 IF_DEBUG(scheduler,printTSO(tso));
1791 RELEASE_LOCK(&sched_mutex);
1794 /* ---------------------------------------------------------------------------
1797 * Start up Posix threads to run each of the scheduler tasks.
1798 * I believe the task ids are not needed in the system as defined.
1800 * ------------------------------------------------------------------------ */
1802 #if defined(PAR) || defined(SMP)
1804 taskStart(void) /* ( void *arg STG_UNUSED) */
1810 /* ---------------------------------------------------------------------------
1813 * Initialise the scheduler. This resets all the queues - if the
1814 * queues contained any threads, they'll be garbage collected at the
1817 * This now calls startTasks(), so should only be called once! KH @ 25/10/99
1818 * ------------------------------------------------------------------------ */
1822 term_handler(int sig STG_UNUSED)
1825 ACQUIRE_LOCK(&term_mutex);
1827 RELEASE_LOCK(&term_mutex);
1833 initCapability( Capability *cap )
1835 cap->f.stgChk0 = (F_)__stg_chk_0;
1836 cap->f.stgChk1 = (F_)__stg_chk_1;
1837 cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
1838 cap->f.stgUpdatePAP = (F_)__stg_update_PAP;
1847 for (i=0; i<=MAX_PROC; i++) {
1848 run_queue_hds[i] = END_TSO_QUEUE;
1849 run_queue_tls[i] = END_TSO_QUEUE;
1850 blocked_queue_hds[i] = END_TSO_QUEUE;
1851 blocked_queue_tls[i] = END_TSO_QUEUE;
1852 ccalling_threadss[i] = END_TSO_QUEUE;
1853 sleeping_queue = END_TSO_QUEUE;
1856 run_queue_hd = END_TSO_QUEUE;
1857 run_queue_tl = END_TSO_QUEUE;
1858 blocked_queue_hd = END_TSO_QUEUE;
1859 blocked_queue_tl = END_TSO_QUEUE;
1860 sleeping_queue = END_TSO_QUEUE;
1863 suspended_ccalling_threads = END_TSO_QUEUE;
1865 main_threads = NULL;
1866 all_threads = END_TSO_QUEUE;
1871 RtsFlags.ConcFlags.ctxtSwitchTicks =
1872 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1874 /* Install the SIGHUP handler */
1877 struct sigaction action,oact;
1879 action.sa_handler = term_handler;
1880 sigemptyset(&action.sa_mask);
1881 action.sa_flags = 0;
1882 if (sigaction(SIGTERM, &action, &oact) != 0) {
1883 barf("can't install TERM handler");
1889 /* Allocate N Capabilities */
1892 Capability *cap, *prev;
1895 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1896 cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1897 initCapability(cap);
1901 free_capabilities = cap;
1902 n_free_capabilities = RtsFlags.ParFlags.nNodes;
1904 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1905 n_free_capabilities););
1907 initCapability(&MainCapability);
1910 #if /* defined(SMP) ||*/ defined(PAR)
1923 /* make some space for saving all the thread ids */
1924 task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1925 "initScheduler:task_ids");
1927 /* and create all the threads */
1928 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1929 r = pthread_create(&tid,NULL,taskStart,NULL);
1931 barf("startTasks: Can't create new Posix thread");
1933 task_ids[i].id = tid;
1934 task_ids[i].mut_time = 0.0;
1935 task_ids[i].mut_etime = 0.0;
1936 task_ids[i].gc_time = 0.0;
1937 task_ids[i].gc_etime = 0.0;
1938 task_ids[i].elapsedtimestart = stat_getElapsedTime();
1939 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1945 exitScheduler( void )
1950 /* Don't want to use pthread_cancel, since we'd have to install
1951 * these silly exception handlers (pthread_cleanup_{push,pop}) around
1955 /* Cancel all our tasks */
1956 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1957 pthread_cancel(task_ids[i].id);
1960 /* Wait for all the tasks to terminate */
1961 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1962 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n",
1964 pthread_join(task_ids[i].id, NULL);
1968 /* Send 'em all a SIGHUP. That should shut 'em up.
1970 await_death = RtsFlags.ParFlags.nNodes;
1971 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1972 pthread_kill(task_ids[i].id,SIGTERM);
1974 while (await_death > 0) {
1980 /* -----------------------------------------------------------------------------
1981 Managing the per-task allocation areas.
1983 Each capability comes with an allocation area. These are
1984 fixed-length block lists into which allocation can be done.
1986 ToDo: no support for two-space collection at the moment???
1987 -------------------------------------------------------------------------- */
1989 /* -----------------------------------------------------------------------------
1990 * waitThread is the external interface for running a new computation
1991 * and waiting for the result.
1993 * In the non-SMP case, we create a new main thread, push it on the
1994 * main-thread stack, and invoke the scheduler to run it. The
1995 * scheduler will return when the top main thread on the stack has
1996 * completed or died, and fill in the necessary fields of the
1997 * main_thread structure.
1999 * In the SMP case, we create a main thread as before, but we then
2000 * create a new condition variable and sleep on it. When our new
2001 * main thread has completed, we'll be woken up and the status/result
2002 * will be in the main_thread struct.
2003 * -------------------------------------------------------------------------- */
2006 howManyThreadsAvail ( void )
2010 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2012 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2014 for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2020 finishAllThreads ( void )
2023 while (run_queue_hd != END_TSO_QUEUE) {
2024 waitThread ( run_queue_hd, NULL );
2026 while (blocked_queue_hd != END_TSO_QUEUE) {
2027 waitThread ( blocked_queue_hd, NULL );
2029 while (sleeping_queue != END_TSO_QUEUE) {
2030 waitThread ( blocked_queue_hd, NULL );
2033 (blocked_queue_hd != END_TSO_QUEUE ||
2034 run_queue_hd != END_TSO_QUEUE ||
2035 sleeping_queue != END_TSO_QUEUE);
2039 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2042 SchedulerStatus stat;
2044 ACQUIRE_LOCK(&sched_mutex);
2046 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2052 pthread_cond_init(&m->wakeup, NULL);
2055 m->link = main_threads;
2058 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n",
2063 pthread_cond_wait(&m->wakeup, &sched_mutex);
2064 } while (m->stat == NoStatus);
2066 /* GranSim specific init */
2067 CurrentTSO = m->tso; // the TSO to run
2068 procStatus[MainProc] = Busy; // status of main PE
2069 CurrentProc = MainProc; // PE to run it on
2074 ASSERT(m->stat != NoStatus);
2080 pthread_cond_destroy(&m->wakeup);
2083 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
2087 RELEASE_LOCK(&sched_mutex);
2092 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2093 //@subsection Run queue code
2097 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2098 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2099 implicit global variable that has to be correct when calling these
2103 /* Put the new thread on the head of the runnable queue.
2104 * The caller of createThread better push an appropriate closure
2105 * on this thread's stack before the scheduler is invoked.
2107 static /* inline */ void
2108 add_to_run_queue(tso)
2111 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2112 tso->link = run_queue_hd;
2114 if (run_queue_tl == END_TSO_QUEUE) {
2119 /* Put the new thread at the end of the runnable queue. */
2120 static /* inline */ void
2121 push_on_run_queue(tso)
2124 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2125 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2126 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2127 if (run_queue_hd == END_TSO_QUEUE) {
2130 run_queue_tl->link = tso;
2136 Should be inlined because it's used very often in schedule. The tso
2137 argument is actually only needed in GranSim, where we want to have the
2138 possibility to schedule *any* TSO on the run queue, irrespective of the
2139 actual ordering. Therefore, if tso is not the nil TSO then we traverse
2140 the run queue and dequeue the tso, adjusting the links in the queue.
2142 //@cindex take_off_run_queue
2143 static /* inline */ StgTSO*
2144 take_off_run_queue(StgTSO *tso) {
2148 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2150 if tso is specified, unlink that tso from the run_queue (doesn't have
2151 to be at the beginning of the queue); GranSim only
2153 if (tso!=END_TSO_QUEUE) {
2154 /* find tso in queue */
2155 for (t=run_queue_hd, prev=END_TSO_QUEUE;
2156 t!=END_TSO_QUEUE && t!=tso;
2160 /* now actually dequeue the tso */
2161 if (prev!=END_TSO_QUEUE) {
2162 ASSERT(run_queue_hd!=t);
2163 prev->link = t->link;
2165 /* t is at beginning of thread queue */
2166 ASSERT(run_queue_hd==t);
2167 run_queue_hd = t->link;
2169 /* t is at end of thread queue */
2170 if (t->link==END_TSO_QUEUE) {
2171 ASSERT(t==run_queue_tl);
2172 run_queue_tl = prev;
2174 ASSERT(run_queue_tl!=t);
2176 t->link = END_TSO_QUEUE;
2178 /* take tso from the beginning of the queue; std concurrent code */
2180 if (t != END_TSO_QUEUE) {
2181 run_queue_hd = t->link;
2182 t->link = END_TSO_QUEUE;
2183 if (run_queue_hd == END_TSO_QUEUE) {
2184 run_queue_tl = END_TSO_QUEUE;
2193 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2194 //@subsection Garbage Collextion Routines
2196 /* ---------------------------------------------------------------------------
2197 Where are the roots that we know about?
2199 - all the threads on the runnable queue
2200 - all the threads on the blocked queue
2201 - all the threads on the sleeping queue
2202 - all the thread currently executing a _ccall_GC
2203 - all the "main threads"
2205 ------------------------------------------------------------------------ */
2207 /* This has to be protected either by the scheduler monitor, or by the
2208 garbage collection monitor (probably the latter).
2213 GetRoots(evac_fn evac)
2220 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2221 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2222 evac((StgClosure **)&run_queue_hds[i]);
2223 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2224 evac((StgClosure **)&run_queue_tls[i]);
2226 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2227 evac((StgClosure **)&blocked_queue_hds[i]);
2228 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2229 evac((StgClosure **)&blocked_queue_tls[i]);
2230 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2231 evac((StgClosure **)&ccalling_threads[i]);
2238 if (run_queue_hd != END_TSO_QUEUE) {
2239 ASSERT(run_queue_tl != END_TSO_QUEUE);
2240 evac((StgClosure **)&run_queue_hd);
2241 evac((StgClosure **)&run_queue_tl);
2244 if (blocked_queue_hd != END_TSO_QUEUE) {
2245 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2246 evac((StgClosure **)&blocked_queue_hd);
2247 evac((StgClosure **)&blocked_queue_tl);
2250 if (sleeping_queue != END_TSO_QUEUE) {
2251 evac((StgClosure **)&sleeping_queue);
2255 for (m = main_threads; m != NULL; m = m->link) {
2256 evac((StgClosure **)&m->tso);
2258 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2259 evac((StgClosure **)&suspended_ccalling_threads);
2262 #if defined(PAR) || defined(GRAN)
2263 markSparkQueue(evac);
2267 /* -----------------------------------------------------------------------------
2270 This is the interface to the garbage collector from Haskell land.
2271 We provide this so that external C code can allocate and garbage
2272 collect when called from Haskell via _ccall_GC.
2274 It might be useful to provide an interface whereby the programmer
2275 can specify more roots (ToDo).
2277 This needs to be protected by the GC condition variable above. KH.
2278 -------------------------------------------------------------------------- */
2280 void (*extra_roots)(evac_fn);
2285 GarbageCollect(GetRoots,rtsFalse);
2289 performMajorGC(void)
2291 GarbageCollect(GetRoots,rtsTrue);
2295 AllRoots(evac_fn evac)
2297 GetRoots(evac); // the scheduler's roots
2298 extra_roots(evac); // the user's roots
2302 performGCWithRoots(void (*get_roots)(evac_fn))
2304 extra_roots = get_roots;
2305 GarbageCollect(AllRoots,rtsFalse);
2308 /* -----------------------------------------------------------------------------
2311 If the thread has reached its maximum stack size, then raise the
2312 StackOverflow exception in the offending thread. Otherwise
2313 relocate the TSO into a larger chunk of memory and adjust its stack
2315 -------------------------------------------------------------------------- */
2318 threadStackOverflow(StgTSO *tso)
2320 nat new_stack_size, new_tso_size, diff, stack_words;
2324 IF_DEBUG(sanity,checkTSO(tso));
2325 if (tso->stack_size >= tso->max_stack_size) {
2328 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2329 tso->id, tso, tso->stack_size, tso->max_stack_size);
2330 /* If we're debugging, just print out the top of the stack */
2331 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2334 /* Send this thread the StackOverflow exception */
2335 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2339 /* Try to double the current stack size. If that takes us over the
2340 * maximum stack size for this thread, then use the maximum instead.
2341 * Finally round up so the TSO ends up as a whole number of blocks.
2343 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2344 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2345 TSO_STRUCT_SIZE)/sizeof(W_);
2346 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2347 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2349 IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2351 dest = (StgTSO *)allocate(new_tso_size);
2352 TICK_ALLOC_TSO(new_stack_size,0);
2354 /* copy the TSO block and the old stack into the new area */
2355 memcpy(dest,tso,TSO_STRUCT_SIZE);
2356 stack_words = tso->stack + tso->stack_size - tso->sp;
2357 new_sp = (P_)dest + new_tso_size - stack_words;
2358 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2360 /* relocate the stack pointers... */
2361 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2362 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2364 dest->stack_size = new_stack_size;
2366 /* and relocate the update frame list */
2367 relocate_stack(dest, diff);
2369 /* Mark the old TSO as relocated. We have to check for relocated
2370 * TSOs in the garbage collector and any primops that deal with TSOs.
2372 * It's important to set the sp and su values to just beyond the end
2373 * of the stack, so we don't attempt to scavenge any part of the
2376 tso->what_next = ThreadRelocated;
2378 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2379 tso->su = (StgUpdateFrame *)tso->sp;
2380 tso->why_blocked = NotBlocked;
2381 dest->mut_link = NULL;
2383 IF_PAR_DEBUG(verbose,
2384 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2385 tso->id, tso, tso->stack_size);
2386 /* If we're debugging, just print out the top of the stack */
2387 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2390 IF_DEBUG(sanity,checkTSO(tso));
2392 IF_DEBUG(scheduler,printTSO(dest));
2398 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2399 //@subsection Blocking Queue Routines
2401 /* ---------------------------------------------------------------------------
2402 Wake up a queue that was blocked on some resource.
2403 ------------------------------------------------------------------------ */
2407 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2412 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2414 /* write RESUME events to log file and
2415 update blocked and fetch time (depending on type of the orig closure) */
2416 if (RtsFlags.ParFlags.ParStats.Full) {
2417 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2418 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2419 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2420 if (EMPTY_RUN_QUEUE())
2421 emitSchedule = rtsTrue;
2423 switch (get_itbl(node)->type) {
2425 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2430 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2437 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2444 static StgBlockingQueueElement *
2445 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2448 PEs node_loc, tso_loc;
2450 node_loc = where_is(node); // should be lifted out of loop
2451 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2452 tso_loc = where_is((StgClosure *)tso);
2453 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2454 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2455 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2456 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2457 // insertThread(tso, node_loc);
2458 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2460 tso, node, (rtsSpark*)NULL);
2461 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2464 } else { // TSO is remote (actually should be FMBQ)
2465 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2466 RtsFlags.GranFlags.Costs.gunblocktime +
2467 RtsFlags.GranFlags.Costs.latency;
2468 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2470 tso, node, (rtsSpark*)NULL);
2471 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2474 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2476 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2477 (node_loc==tso_loc ? "Local" : "Global"),
2478 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2479 tso->block_info.closure = NULL;
2480 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2484 static StgBlockingQueueElement *
2485 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2487 StgBlockingQueueElement *next;
2489 switch (get_itbl(bqe)->type) {
2491 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2492 /* if it's a TSO just push it onto the run_queue */
2494 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2495 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2497 unblockCount(bqe, node);
2498 /* reset blocking status after dumping event */
2499 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2503 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2505 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2506 PendingFetches = (StgBlockedFetch *)bqe;
2510 /* can ignore this case in a non-debugging setup;
2511 see comments on RBHSave closures above */
2513 /* check that the closure is an RBHSave closure */
2514 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2515 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2516 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2520 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2521 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2525 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2529 #else /* !GRAN && !PAR */
2531 unblockOneLocked(StgTSO *tso)
2535 ASSERT(get_itbl(tso)->type == TSO);
2536 ASSERT(tso->why_blocked != NotBlocked);
2537 tso->why_blocked = NotBlocked;
2539 PUSH_ON_RUN_QUEUE(tso);
2541 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2546 #if defined(GRAN) || defined(PAR)
2547 inline StgBlockingQueueElement *
2548 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2550 ACQUIRE_LOCK(&sched_mutex);
2551 bqe = unblockOneLocked(bqe, node);
2552 RELEASE_LOCK(&sched_mutex);
2557 unblockOne(StgTSO *tso)
2559 ACQUIRE_LOCK(&sched_mutex);
2560 tso = unblockOneLocked(tso);
2561 RELEASE_LOCK(&sched_mutex);
2568 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2570 StgBlockingQueueElement *bqe;
2575 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2576 node, CurrentProc, CurrentTime[CurrentProc],
2577 CurrentTSO->id, CurrentTSO));
2579 node_loc = where_is(node);
2581 ASSERT(q == END_BQ_QUEUE ||
2582 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2583 get_itbl(q)->type == CONSTR); // closure (type constructor)
2584 ASSERT(is_unique(node));
2586 /* FAKE FETCH: magically copy the node to the tso's proc;
2587 no Fetch necessary because in reality the node should not have been
2588 moved to the other PE in the first place
2590 if (CurrentProc!=node_loc) {
2592 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2593 node, node_loc, CurrentProc, CurrentTSO->id,
2594 // CurrentTSO, where_is(CurrentTSO),
2595 node->header.gran.procs));
2596 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2598 belch("## new bitmask of node %p is %#x",
2599 node, node->header.gran.procs));
2600 if (RtsFlags.GranFlags.GranSimStats.Global) {
2601 globalGranStats.tot_fake_fetches++;
2606 // ToDo: check: ASSERT(CurrentProc==node_loc);
2607 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2610 bqe points to the current element in the queue
2611 next points to the next element in the queue
2613 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2614 //tso_loc = where_is(tso);
2616 bqe = unblockOneLocked(bqe, node);
2619 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2620 the closure to make room for the anchor of the BQ */
2621 if (bqe!=END_BQ_QUEUE) {
2622 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2624 ASSERT((info_ptr==&RBH_Save_0_info) ||
2625 (info_ptr==&RBH_Save_1_info) ||
2626 (info_ptr==&RBH_Save_2_info));
2628 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2629 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2630 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2633 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2634 node, info_type(node)));
2637 /* statistics gathering */
2638 if (RtsFlags.GranFlags.GranSimStats.Global) {
2639 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2640 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2641 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2642 globalGranStats.tot_awbq++; // total no. of bqs awakened
2645 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2646 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2650 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2652 StgBlockingQueueElement *bqe;
2654 ACQUIRE_LOCK(&sched_mutex);
2656 IF_PAR_DEBUG(verbose,
2657 belch("##-_ AwBQ for node %p on [%x]: ",
2661 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2662 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2667 ASSERT(q == END_BQ_QUEUE ||
2668 get_itbl(q)->type == TSO ||
2669 get_itbl(q)->type == BLOCKED_FETCH ||
2670 get_itbl(q)->type == CONSTR);
2673 while (get_itbl(bqe)->type==TSO ||
2674 get_itbl(bqe)->type==BLOCKED_FETCH) {
2675 bqe = unblockOneLocked(bqe, node);
2677 RELEASE_LOCK(&sched_mutex);
2680 #else /* !GRAN && !PAR */
2682 awakenBlockedQueue(StgTSO *tso)
2684 ACQUIRE_LOCK(&sched_mutex);
2685 while (tso != END_TSO_QUEUE) {
2686 tso = unblockOneLocked(tso);
2688 RELEASE_LOCK(&sched_mutex);
2692 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2693 //@subsection Exception Handling Routines
2695 /* ---------------------------------------------------------------------------
2697 - usually called inside a signal handler so it mustn't do anything fancy.
2698 ------------------------------------------------------------------------ */
2701 interruptStgRts(void)
2707 /* -----------------------------------------------------------------------------
2710 This is for use when we raise an exception in another thread, which
2712 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2713 -------------------------------------------------------------------------- */
2715 #if defined(GRAN) || defined(PAR)
2717 NB: only the type of the blocking queue is different in GranSim and GUM
2718 the operations on the queue-elements are the same
2719 long live polymorphism!
2722 unblockThread(StgTSO *tso)
2724 StgBlockingQueueElement *t, **last;
2726 ACQUIRE_LOCK(&sched_mutex);
2727 switch (tso->why_blocked) {
2730 return; /* not blocked */
2733 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2735 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2736 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2738 last = (StgBlockingQueueElement **)&mvar->head;
2739 for (t = (StgBlockingQueueElement *)mvar->head;
2741 last = &t->link, last_tso = t, t = t->link) {
2742 if (t == (StgBlockingQueueElement *)tso) {
2743 *last = (StgBlockingQueueElement *)tso->link;
2744 if (mvar->tail == tso) {
2745 mvar->tail = (StgTSO *)last_tso;
2750 barf("unblockThread (MVAR): TSO not found");
2753 case BlockedOnBlackHole:
2754 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2756 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2758 last = &bq->blocking_queue;
2759 for (t = bq->blocking_queue;
2761 last = &t->link, t = t->link) {
2762 if (t == (StgBlockingQueueElement *)tso) {
2763 *last = (StgBlockingQueueElement *)tso->link;
2767 barf("unblockThread (BLACKHOLE): TSO not found");
2770 case BlockedOnException:
2772 StgTSO *target = tso->block_info.tso;
2774 ASSERT(get_itbl(target)->type == TSO);
2776 if (target->what_next == ThreadRelocated) {
2777 target = target->link;
2778 ASSERT(get_itbl(target)->type == TSO);
2781 ASSERT(target->blocked_exceptions != NULL);
2783 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2784 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2786 last = &t->link, t = t->link) {
2787 ASSERT(get_itbl(t)->type == TSO);
2788 if (t == (StgBlockingQueueElement *)tso) {
2789 *last = (StgBlockingQueueElement *)tso->link;
2793 barf("unblockThread (Exception): TSO not found");
2797 case BlockedOnWrite:
2799 /* take TSO off blocked_queue */
2800 StgBlockingQueueElement *prev = NULL;
2801 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2802 prev = t, t = t->link) {
2803 if (t == (StgBlockingQueueElement *)tso) {
2805 blocked_queue_hd = (StgTSO *)t->link;
2806 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2807 blocked_queue_tl = END_TSO_QUEUE;
2810 prev->link = t->link;
2811 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2812 blocked_queue_tl = (StgTSO *)prev;
2818 barf("unblockThread (I/O): TSO not found");
2821 case BlockedOnDelay:
2823 /* take TSO off sleeping_queue */
2824 StgBlockingQueueElement *prev = NULL;
2825 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2826 prev = t, t = t->link) {
2827 if (t == (StgBlockingQueueElement *)tso) {
2829 sleeping_queue = (StgTSO *)t->link;
2831 prev->link = t->link;
2836 barf("unblockThread (I/O): TSO not found");
2840 barf("unblockThread");
2844 tso->link = END_TSO_QUEUE;
2845 tso->why_blocked = NotBlocked;
2846 tso->block_info.closure = NULL;
2847 PUSH_ON_RUN_QUEUE(tso);
2848 RELEASE_LOCK(&sched_mutex);
2852 unblockThread(StgTSO *tso)
2856 ACQUIRE_LOCK(&sched_mutex);
2857 switch (tso->why_blocked) {
2860 return; /* not blocked */
2863 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2865 StgTSO *last_tso = END_TSO_QUEUE;
2866 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2869 for (t = mvar->head; t != END_TSO_QUEUE;
2870 last = &t->link, last_tso = t, t = t->link) {
2873 if (mvar->tail == tso) {
2874 mvar->tail = last_tso;
2879 barf("unblockThread (MVAR): TSO not found");
2882 case BlockedOnBlackHole:
2883 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2885 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2887 last = &bq->blocking_queue;
2888 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2889 last = &t->link, t = t->link) {
2895 barf("unblockThread (BLACKHOLE): TSO not found");
2898 case BlockedOnException:
2900 StgTSO *target = tso->block_info.tso;
2902 ASSERT(get_itbl(target)->type == TSO);
2904 while (target->what_next == ThreadRelocated) {
2905 target = target->link;
2906 ASSERT(get_itbl(target)->type == TSO);
2909 ASSERT(target->blocked_exceptions != NULL);
2911 last = &target->blocked_exceptions;
2912 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2913 last = &t->link, t = t->link) {
2914 ASSERT(get_itbl(t)->type == TSO);
2920 barf("unblockThread (Exception): TSO not found");
2924 case BlockedOnWrite:
2926 StgTSO *prev = NULL;
2927 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2928 prev = t, t = t->link) {
2931 blocked_queue_hd = t->link;
2932 if (blocked_queue_tl == t) {
2933 blocked_queue_tl = END_TSO_QUEUE;
2936 prev->link = t->link;
2937 if (blocked_queue_tl == t) {
2938 blocked_queue_tl = prev;
2944 barf("unblockThread (I/O): TSO not found");
2947 case BlockedOnDelay:
2949 StgTSO *prev = NULL;
2950 for (t = sleeping_queue; t != END_TSO_QUEUE;
2951 prev = t, t = t->link) {
2954 sleeping_queue = t->link;
2956 prev->link = t->link;
2961 barf("unblockThread (I/O): TSO not found");
2965 barf("unblockThread");
2969 tso->link = END_TSO_QUEUE;
2970 tso->why_blocked = NotBlocked;
2971 tso->block_info.closure = NULL;
2972 PUSH_ON_RUN_QUEUE(tso);
2973 RELEASE_LOCK(&sched_mutex);
2977 /* -----------------------------------------------------------------------------
2980 * The following function implements the magic for raising an
2981 * asynchronous exception in an existing thread.
2983 * We first remove the thread from any queue on which it might be
2984 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2986 * We strip the stack down to the innermost CATCH_FRAME, building
2987 * thunks in the heap for all the active computations, so they can
2988 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2989 * an application of the handler to the exception, and push it on
2990 * the top of the stack.
2992 * How exactly do we save all the active computations? We create an
2993 * AP_UPD for every UpdateFrame on the stack. Entering one of these
2994 * AP_UPDs pushes everything from the corresponding update frame
2995 * upwards onto the stack. (Actually, it pushes everything up to the
2996 * next update frame plus a pointer to the next AP_UPD object.
2997 * Entering the next AP_UPD object pushes more onto the stack until we
2998 * reach the last AP_UPD object - at which point the stack should look
2999 * exactly as it did when we killed the TSO and we can continue
3000 * execution by entering the closure on top of the stack.
3002 * We can also kill a thread entirely - this happens if either (a) the
3003 * exception passed to raiseAsync is NULL, or (b) there's no
3004 * CATCH_FRAME on the stack. In either case, we strip the entire
3005 * stack and replace the thread with a zombie.
3007 * -------------------------------------------------------------------------- */
3010 deleteThread(StgTSO *tso)
3012 raiseAsync(tso,NULL);
3016 raiseAsync(StgTSO *tso, StgClosure *exception)
3018 StgUpdateFrame* su = tso->su;
3019 StgPtr sp = tso->sp;
3021 /* Thread already dead? */
3022 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3026 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3028 /* Remove it from any blocking queues */
3031 /* The stack freezing code assumes there's a closure pointer on
3032 * the top of the stack. This isn't always the case with compiled
3033 * code, so we have to push a dummy closure on the top which just
3034 * returns to the next return address on the stack.
3036 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3037 *(--sp) = (W_)&stg_dummy_ret_closure;
3041 nat words = ((P_)su - (P_)sp) - 1;
3045 /* If we find a CATCH_FRAME, and we've got an exception to raise,
3046 * then build PAP(handler,exception,realworld#), and leave it on
3047 * top of the stack ready to enter.
3049 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3050 StgCatchFrame *cf = (StgCatchFrame *)su;
3051 /* we've got an exception to raise, so let's pass it to the
3052 * handler in this frame.
3054 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3055 TICK_ALLOC_UPD_PAP(3,0);
3056 SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3059 ap->fun = cf->handler; /* :: Exception -> IO a */
3060 ap->payload[0] = exception;
3061 ap->payload[1] = ARG_TAG(0); /* realworld token */
3063 /* throw away the stack from Sp up to and including the
3066 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
3069 /* Restore the blocked/unblocked state for asynchronous exceptions
3070 * at the CATCH_FRAME.
3072 * If exceptions were unblocked at the catch, arrange that they
3073 * are unblocked again after executing the handler by pushing an
3074 * unblockAsyncExceptions_ret stack frame.
3076 if (!cf->exceptions_blocked) {
3077 *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3080 /* Ensure that async exceptions are blocked when running the handler.
3082 if (tso->blocked_exceptions == NULL) {
3083 tso->blocked_exceptions = END_TSO_QUEUE;
3086 /* Put the newly-built PAP on top of the stack, ready to execute
3087 * when the thread restarts.
3091 tso->what_next = ThreadEnterGHC;
3092 IF_DEBUG(sanity, checkTSO(tso));
3096 /* First build an AP_UPD consisting of the stack chunk above the
3097 * current update frame, with the top word on the stack as the
3100 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3105 ap->fun = (StgClosure *)sp[0];
3107 for(i=0; i < (nat)words; ++i) {
3108 ap->payload[i] = (StgClosure *)*sp++;
3111 switch (get_itbl(su)->type) {
3115 SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */);
3116 TICK_ALLOC_UP_THK(words+1,0);
3119 fprintf(stderr, "scheduler: Updating ");
3120 printPtr((P_)su->updatee);
3121 fprintf(stderr, " with ");
3122 printObj((StgClosure *)ap);
3125 /* Replace the updatee with an indirection - happily
3126 * this will also wake up any threads currently
3127 * waiting on the result.
3129 * Warning: if we're in a loop, more than one update frame on
3130 * the stack may point to the same object. Be careful not to
3131 * overwrite an IND_OLDGEN in this case, because we'll screw
3132 * up the mutable lists. To be on the safe side, don't
3133 * overwrite any kind of indirection at all. See also
3134 * threadSqueezeStack in GC.c, where we have to make a similar
3137 if (!closure_IND(su->updatee)) {
3138 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
3141 sp += sizeofW(StgUpdateFrame) -1;
3142 sp[0] = (W_)ap; /* push onto stack */
3148 StgCatchFrame *cf = (StgCatchFrame *)su;
3151 /* We want a PAP, not an AP_UPD. Fortunately, the
3152 * layout's the same.
3154 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3155 TICK_ALLOC_UPD_PAP(words+1,0);
3157 /* now build o = FUN(catch,ap,handler) */
3158 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3159 TICK_ALLOC_FUN(2,0);
3160 SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3161 o->payload[0] = (StgClosure *)ap;
3162 o->payload[1] = cf->handler;
3165 fprintf(stderr, "scheduler: Built ");
3166 printObj((StgClosure *)o);
3169 /* pop the old handler and put o on the stack */
3171 sp += sizeofW(StgCatchFrame) - 1;
3178 StgSeqFrame *sf = (StgSeqFrame *)su;
3181 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3182 TICK_ALLOC_UPD_PAP(words+1,0);
3184 /* now build o = FUN(seq,ap) */
3185 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3186 TICK_ALLOC_SE_THK(1,0);
3187 SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3188 o->payload[0] = (StgClosure *)ap;
3191 fprintf(stderr, "scheduler: Built ");
3192 printObj((StgClosure *)o);
3195 /* pop the old handler and put o on the stack */
3197 sp += sizeofW(StgSeqFrame) - 1;
3203 /* We've stripped the entire stack, the thread is now dead. */
3204 sp += sizeofW(StgStopFrame) - 1;
3205 sp[0] = (W_)exception; /* save the exception */
3206 tso->what_next = ThreadKilled;
3207 tso->su = (StgUpdateFrame *)(sp+1);
3218 /* -----------------------------------------------------------------------------
3219 resurrectThreads is called after garbage collection on the list of
3220 threads found to be garbage. Each of these threads will be woken
3221 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3222 on an MVar, or NonTermination if the thread was blocked on a Black
3224 -------------------------------------------------------------------------- */
3227 resurrectThreads( StgTSO *threads )
3231 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3232 next = tso->global_link;
3233 tso->global_link = all_threads;
3235 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3237 switch (tso->why_blocked) {
3239 case BlockedOnException:
3240 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3242 case BlockedOnBlackHole:
3243 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3246 /* This might happen if the thread was blocked on a black hole
3247 * belonging to a thread that we've just woken up (raiseAsync
3248 * can wake up threads, remember...).
3252 barf("resurrectThreads: thread blocked in a strange way");
3257 /* -----------------------------------------------------------------------------
3258 * Blackhole detection: if we reach a deadlock, test whether any
3259 * threads are blocked on themselves. Any threads which are found to
3260 * be self-blocked get sent a NonTermination exception.
3262 * This is only done in a deadlock situation in order to avoid
3263 * performance overhead in the normal case.
3264 * -------------------------------------------------------------------------- */
3267 detectBlackHoles( void )
3269 StgTSO *t = all_threads;
3270 StgUpdateFrame *frame;
3271 StgClosure *blocked_on;
3273 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3275 while (t->what_next == ThreadRelocated) {
3277 ASSERT(get_itbl(t)->type == TSO);
3280 if (t->why_blocked != BlockedOnBlackHole) {
3284 blocked_on = t->block_info.closure;
3286 for (frame = t->su; ; frame = frame->link) {
3287 switch (get_itbl(frame)->type) {
3290 if (frame->updatee == blocked_on) {
3291 /* We are blocking on one of our own computations, so
3292 * send this thread the NonTermination exception.
3295 sched_belch("thread %d is blocked on itself", t->id));
3296 raiseAsync(t, (StgClosure *)NonTermination_closure);
3317 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3318 //@subsection Debugging Routines
3320 /* -----------------------------------------------------------------------------
3321 Debugging: why is a thread blocked
3322 -------------------------------------------------------------------------- */
3327 printThreadBlockage(StgTSO *tso)
3329 switch (tso->why_blocked) {
3331 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3333 case BlockedOnWrite:
3334 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3336 case BlockedOnDelay:
3337 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3340 fprintf(stderr,"is blocked on an MVar");
3342 case BlockedOnException:
3343 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3344 tso->block_info.tso->id);
3346 case BlockedOnBlackHole:
3347 fprintf(stderr,"is blocked on a black hole");
3350 fprintf(stderr,"is not blocked");
3354 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3355 tso->block_info.closure, info_type(tso->block_info.closure));
3357 case BlockedOnGA_NoSend:
3358 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3359 tso->block_info.closure, info_type(tso->block_info.closure));
3363 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3364 tso->why_blocked, tso->id, tso);
3369 printThreadStatus(StgTSO *tso)
3371 switch (tso->what_next) {
3373 fprintf(stderr,"has been killed");
3375 case ThreadComplete:
3376 fprintf(stderr,"has completed");
3379 printThreadBlockage(tso);
3384 printAllThreads(void)
3389 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3390 ullong_format_string(TIME_ON_PROC(CurrentProc),
3391 time_string, rtsFalse/*no commas!*/);
3393 sched_belch("all threads at [%s]:", time_string);
3395 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3396 ullong_format_string(CURRENT_TIME,
3397 time_string, rtsFalse/*no commas!*/);
3399 sched_belch("all threads at [%s]:", time_string);
3401 sched_belch("all threads:");
3404 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3405 fprintf(stderr, "\tthread %d ", t->id);
3406 printThreadStatus(t);
3407 fprintf(stderr,"\n");
3412 Print a whole blocking queue attached to node (debugging only).
3417 print_bq (StgClosure *node)
3419 StgBlockingQueueElement *bqe;
3423 fprintf(stderr,"## BQ of closure %p (%s): ",
3424 node, info_type(node));
3426 /* should cover all closures that may have a blocking queue */
3427 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3428 get_itbl(node)->type == FETCH_ME_BQ ||
3429 get_itbl(node)->type == RBH ||
3430 get_itbl(node)->type == MVAR);
3432 ASSERT(node!=(StgClosure*)NULL); // sanity check
3434 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3438 Print a whole blocking queue starting with the element bqe.
3441 print_bqe (StgBlockingQueueElement *bqe)
3446 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3448 for (end = (bqe==END_BQ_QUEUE);
3449 !end; // iterate until bqe points to a CONSTR
3450 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3451 bqe = end ? END_BQ_QUEUE : bqe->link) {
3452 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3453 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3454 /* types of closures that may appear in a blocking queue */
3455 ASSERT(get_itbl(bqe)->type == TSO ||
3456 get_itbl(bqe)->type == BLOCKED_FETCH ||
3457 get_itbl(bqe)->type == CONSTR);
3458 /* only BQs of an RBH end with an RBH_Save closure */
3459 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3461 switch (get_itbl(bqe)->type) {
3463 fprintf(stderr," TSO %u (%x),",
3464 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3467 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3468 ((StgBlockedFetch *)bqe)->node,
3469 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3470 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3471 ((StgBlockedFetch *)bqe)->ga.weight);
3474 fprintf(stderr," %s (IP %p),",
3475 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3476 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3477 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3478 "RBH_Save_?"), get_itbl(bqe));
3481 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3482 info_type((StgClosure *)bqe)); // , node, info_type(node));
3486 fputc('\n', stderr);
3488 # elif defined(GRAN)
3490 print_bq (StgClosure *node)
3492 StgBlockingQueueElement *bqe;
3493 PEs node_loc, tso_loc;
3496 /* should cover all closures that may have a blocking queue */
3497 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3498 get_itbl(node)->type == FETCH_ME_BQ ||
3499 get_itbl(node)->type == RBH);
3501 ASSERT(node!=(StgClosure*)NULL); // sanity check
3502 node_loc = where_is(node);
3504 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3505 node, info_type(node), node_loc);
3508 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3510 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3511 !end; // iterate until bqe points to a CONSTR
3512 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3513 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3514 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3515 /* types of closures that may appear in a blocking queue */
3516 ASSERT(get_itbl(bqe)->type == TSO ||
3517 get_itbl(bqe)->type == CONSTR);
3518 /* only BQs of an RBH end with an RBH_Save closure */
3519 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3521 tso_loc = where_is((StgClosure *)bqe);
3522 switch (get_itbl(bqe)->type) {
3524 fprintf(stderr," TSO %d (%p) on [PE %d],",
3525 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3528 fprintf(stderr," %s (IP %p),",
3529 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3530 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3531 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3532 "RBH_Save_?"), get_itbl(bqe));
3535 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3536 info_type((StgClosure *)bqe), node, info_type(node));
3540 fputc('\n', stderr);
3544 Nice and easy: only TSOs on the blocking queue
3547 print_bq (StgClosure *node)
3551 ASSERT(node!=(StgClosure*)NULL); // sanity check
3552 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3553 tso != END_TSO_QUEUE;
3555 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3556 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3557 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3559 fputc('\n', stderr);
3570 for (i=0, tso=run_queue_hd;
3571 tso != END_TSO_QUEUE;
3580 sched_belch(char *s, ...)
3585 fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3587 fprintf(stderr, "== ");
3589 fprintf(stderr, "scheduler: ");
3591 vfprintf(stderr, s, ap);
3592 fprintf(stderr, "\n");
3598 //@node Index, , Debugging Routines, Main scheduling code
3602 //* MainRegTable:: @cindex\s-+MainRegTable
3603 //* StgMainThread:: @cindex\s-+StgMainThread
3604 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3605 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3606 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3607 //* context_switch:: @cindex\s-+context_switch
3608 //* createThread:: @cindex\s-+createThread
3609 //* free_capabilities:: @cindex\s-+free_capabilities
3610 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3611 //* initScheduler:: @cindex\s-+initScheduler
3612 //* interrupted:: @cindex\s-+interrupted
3613 //* n_free_capabilities:: @cindex\s-+n_free_capabilities
3614 //* next_thread_id:: @cindex\s-+next_thread_id
3615 //* print_bq:: @cindex\s-+print_bq
3616 //* run_queue_hd:: @cindex\s-+run_queue_hd
3617 //* run_queue_tl:: @cindex\s-+run_queue_tl
3618 //* sched_mutex:: @cindex\s-+sched_mutex
3619 //* schedule:: @cindex\s-+schedule
3620 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3621 //* task_ids:: @cindex\s-+task_ids
3622 //* term_mutex:: @cindex\s-+term_mutex
3623 //* thread_ready_cond:: @cindex\s-+thread_ready_cond