1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.108 2001/11/26 16:54:22 simonmar Exp $
4 * (c) The GHC Team, 1998-2000
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distributed memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
17 * --------------------------------------------------------------------------*/
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
23 * Version with scheduler monitor support for SMPs (WAY=s):
25 This design provides a high-level API to create and schedule threads etc.
26 as documented in the SMP design document.
28 It uses a monitor design controlled by a single mutex to exercise control
29 over accesses to shared data structures, and builds on the Posix threads
32 The majority of state is shared. In order to keep essential per-task state,
33 there is a Capability structure, which contains all the information
34 needed to run a thread: its STG registers, a pointer to its TSO, a
35 nursery etc. During STG execution, a pointer to the capability is
36 kept in a register (BaseReg).
38 In a non-SMP build, there is one global capability, namely MainRegTable.
42 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44 The main scheduling loop in GUM iterates until a finish message is received.
45 In that case a global flag @receivedFinish@ is set and this instance of
46 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47 for the handling of incoming messages, such as PP_FINISH.
48 Note that in the parallel case we have a system manager that coordinates
49 different PEs, each of which are running one instance of the RTS.
50 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55 The main scheduling code in GranSim is quite different from that in std
56 (concurrent) Haskell: while concurrent Haskell just iterates over the
57 threads in the runnable queue, GranSim is event driven, i.e. it iterates
58 over the events in the global event queue. -- HWL
63 //* Variables and Data structures::
64 //* Main scheduling loop::
65 //* Suspend and Resume::
67 //* Garbage Collextion Routines::
68 //* Blocking Queue Routines::
69 //* Exception Handling Routines::
70 //* Debugging Routines::
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
77 #include "PosixSource.h"
84 #include "StgStartup.h"
87 #include "StgMiscClosures.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
99 #include "Proftimer.h"
100 #include "ProfHeap.h"
101 #include "RetainerProfile.h"
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
116 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
117 //@subsection Variables and Data structures
121 * These are the threads which clients have requested that we run.
123 * In an SMP build, we might have several concurrent clients all
124 * waiting for results, and each one will wait on a condition variable
125 * until the result is available.
127 * In non-SMP, clients are strictly nested: the first client calls
128 * into the RTS, which might call out again to C with a _ccall_GC, and
129 * eventually re-enter the RTS.
131 * Main threads information is kept in a linked list:
133 //@cindex StgMainThread
134 typedef struct StgMainThread_ {
136 SchedulerStatus stat;
139 pthread_cond_t wakeup;
141 struct StgMainThread_ *link;
144 /* Main thread queue.
145 * Locks required: sched_mutex.
147 static StgMainThread *main_threads;
150 * Locks required: sched_mutex.
154 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
155 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
158 In GranSim we have a runable and a blocked queue for each processor.
159 In order to minimise code changes new arrays run_queue_hds/tls
160 are created. run_queue_hd is then a short cut (macro) for
161 run_queue_hds[CurrentProc] (see GranSim.h).
164 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
165 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
166 StgTSO *ccalling_threadss[MAX_PROC];
167 /* We use the same global list of threads (all_threads) in GranSim as in
168 the std RTS (i.e. we are cheating). However, we don't use this list in
169 the GranSim specific code at the moment (so we are only potentially
174 StgTSO *run_queue_hd, *run_queue_tl;
175 StgTSO *blocked_queue_hd, *blocked_queue_tl;
176 StgTSO *sleeping_queue; /* perhaps replace with a hash table? */
180 /* Linked list of all threads.
181 * Used for detecting garbage collected threads.
185 /* Threads suspended in _ccall_GC.
187 static StgTSO *suspended_ccalling_threads;
189 static StgTSO *threadStackOverflow(StgTSO *tso);
191 /* KH: The following two flags are shared memory locations. There is no need
192 to lock them, since they are only unset at the end of a scheduler
196 /* flag set by signal handler to precipitate a context switch */
197 //@cindex context_switch
200 /* if this flag is set as well, give up execution */
201 //@cindex interrupted
204 /* Next thread ID to allocate.
205 * Locks required: sched_mutex
207 //@cindex next_thread_id
208 StgThreadID next_thread_id = 1;
211 * Pointers to the state of the current thread.
212 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
213 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
216 /* The smallest stack size that makes any sense is:
217 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
218 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
219 * + 1 (the realworld token for an IO thread)
220 * + 1 (the closure to enter)
222 * A thread with this stack will bomb immediately with a stack
223 * overflow, which will increase its stack size.
226 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
228 /* Free capability list.
229 * Locks required: sched_mutex.
232 Capability *free_capabilities; /* Available capabilities for running threads */
233 nat n_free_capabilities; /* total number of available capabilities */
235 Capability MainCapability; /* for non-SMP, we have one global capability */
242 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
243 * exists - earlier gccs apparently didn't.
250 /* All our current task ids, saved in case we need to kill them later.
257 void addToBlockedQueue ( StgTSO *tso );
259 static void schedule ( void );
260 void interruptStgRts ( void );
262 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
264 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
267 static void detectBlackHoles ( void );
270 static void sched_belch(char *s, ...);
274 //@cindex sched_mutex
276 //@cindex thread_ready_cond
277 //@cindex gc_pending_cond
278 pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
279 pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
280 pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
281 pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
288 rtsTime TimeOfLastYield;
289 rtsBool emitSchedule = rtsTrue;
293 char *whatNext_strs[] = {
301 char *threadReturnCode_strs[] = {
302 "HeapOverflow", /* might also be StackOverflow */
311 StgTSO * createSparkThread(rtsSpark spark);
312 StgTSO * activateSpark (rtsSpark spark);
316 * The thread state for the main thread.
317 // ToDo: check whether not needed any more
321 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
322 //@subsection Main scheduling loop
324 /* ---------------------------------------------------------------------------
325 Main scheduling loop.
327 We use round-robin scheduling, each thread returning to the
328 scheduler loop when one of these conditions is detected:
331 * timer expires (thread yields)
336 Locking notes: we acquire the scheduler lock once at the beginning
337 of the scheduler loop, and release it when
339 * running a thread, or
340 * waiting for work, or
341 * waiting for a GC to complete.
344 In a GranSim setup this loop iterates over the global event queue.
345 This revolves around the global event queue, which determines what
346 to do next. Therefore, it's more complicated than either the
347 concurrent or the parallel (GUM) setup.
350 GUM iterates over incoming messages.
351 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
352 and sends out a fish whenever it has nothing to do; in-between
353 doing the actual reductions (shared code below) it processes the
354 incoming messages and deals with delayed operations
355 (see PendingFetches).
356 This is not the ugliest code you could imagine, but it's bloody close.
358 ------------------------------------------------------------------------ */
365 StgThreadReturnCode ret;
373 rtsBool receivedFinish = rtsFalse;
375 nat tp_size, sp_size; // stats only
378 rtsBool was_interrupted = rtsFalse;
380 ACQUIRE_LOCK(&sched_mutex);
384 /* set up first event to get things going */
385 /* ToDo: assign costs for system setup and init MainTSO ! */
386 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
388 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
391 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
392 G_TSO(CurrentTSO, 5));
394 if (RtsFlags.GranFlags.Light) {
395 /* Save current time; GranSim Light only */
396 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
399 event = get_next_event();
401 while (event!=(rtsEvent*)NULL) {
402 /* Choose the processor with the next event */
403 CurrentProc = event->proc;
404 CurrentTSO = event->tso;
408 while (!receivedFinish) { /* set by processMessages */
409 /* when receiving PP_FINISH message */
416 IF_DEBUG(scheduler, printAllThreads());
418 /* If we're interrupted (the user pressed ^C, or some other
419 * termination condition occurred), kill all the currently running
423 IF_DEBUG(scheduler, sched_belch("interrupted"));
425 interrupted = rtsFalse;
426 was_interrupted = rtsTrue;
429 /* Go through the list of main threads and wake up any
430 * clients whose computations have finished. ToDo: this
431 * should be done more efficiently without a linear scan
432 * of the main threads list, somehow...
436 StgMainThread *m, **prev;
437 prev = &main_threads;
438 for (m = main_threads; m != NULL; m = m->link) {
439 switch (m->tso->what_next) {
442 *(m->ret) = (StgClosure *)m->tso->sp[0];
446 pthread_cond_broadcast(&m->wakeup);
449 if (m->ret) *(m->ret) = NULL;
451 if (was_interrupted) {
452 m->stat = Interrupted;
456 pthread_cond_broadcast(&m->wakeup);
467 /* in GUM do this only on the Main PE */
470 /* If our main thread has finished or been killed, return.
473 StgMainThread *m = main_threads;
474 if (m->tso->what_next == ThreadComplete
475 || m->tso->what_next == ThreadKilled) {
476 main_threads = main_threads->link;
477 if (m->tso->what_next == ThreadComplete) {
478 /* we finished successfully, fill in the return value */
479 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
483 if (m->ret) { *(m->ret) = NULL; };
484 if (was_interrupted) {
485 m->stat = Interrupted;
495 /* Top up the run queue from our spark pool. We try to make the
496 * number of threads in the run queue equal to the number of
501 nat n = n_free_capabilities;
502 StgTSO *tso = run_queue_hd;
504 /* Count the run queue */
505 while (n > 0 && tso != END_TSO_QUEUE) {
512 spark = findSpark(rtsFalse);
514 break; /* no more sparks in the pool */
516 /* I'd prefer this to be done in activateSpark -- HWL */
517 /* tricky - it needs to hold the scheduler lock and
518 * not try to re-acquire it -- SDM */
519 createSparkThread(spark);
521 sched_belch("==^^ turning spark of closure %p into a thread",
522 (StgClosure *)spark));
525 /* We need to wake up the other tasks if we just created some
528 if (n_free_capabilities - n > 1) {
529 pthread_cond_signal(&thread_ready_cond);
534 /* check for signals each time around the scheduler */
535 #ifndef mingw32_TARGET_OS
536 if (signals_pending()) {
537 startSignalHandlers();
541 /* Check whether any waiting threads need to be woken up. If the
542 * run queue is empty, and there are no other tasks running, we
543 * can wait indefinitely for something to happen.
544 * ToDo: what if another client comes along & requests another
547 if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
549 (run_queue_hd == END_TSO_QUEUE)
551 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
555 /* we can be interrupted while waiting for I/O... */
556 if (interrupted) continue;
559 * Detect deadlock: when we have no threads to run, there are no
560 * threads waiting on I/O or sleeping, and all the other tasks are
561 * waiting for work, we must have a deadlock of some description.
563 * We first try to find threads blocked on themselves (ie. black
564 * holes), and generate NonTermination exceptions where necessary.
566 * If no threads are black holed, we have a deadlock situation, so
567 * inform all the main threads.
570 if (blocked_queue_hd == END_TSO_QUEUE
571 && run_queue_hd == END_TSO_QUEUE
572 && sleeping_queue == END_TSO_QUEUE
574 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
578 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
579 GarbageCollect(GetRoots,rtsTrue);
580 if (blocked_queue_hd == END_TSO_QUEUE
581 && run_queue_hd == END_TSO_QUEUE
582 && sleeping_queue == END_TSO_QUEUE) {
583 IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
585 if (run_queue_hd == END_TSO_QUEUE) {
586 StgMainThread *m = main_threads;
588 for (; m != NULL; m = m->link) {
589 deleteThread(m->tso);
592 pthread_cond_broadcast(&m->wakeup);
596 deleteThread(m->tso);
599 main_threads = m->link;
606 /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
610 /* If there's a GC pending, don't do anything until it has
614 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
615 pthread_cond_wait(&gc_pending_cond, &sched_mutex);
618 /* block until we've got a thread on the run queue and a free
621 while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
622 IF_DEBUG(scheduler, sched_belch("waiting for work"));
623 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
624 IF_DEBUG(scheduler, sched_belch("work now available"));
630 if (RtsFlags.GranFlags.Light)
631 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
633 /* adjust time based on time-stamp */
634 if (event->time > CurrentTime[CurrentProc] &&
635 event->evttype != ContinueThread)
636 CurrentTime[CurrentProc] = event->time;
638 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
639 if (!RtsFlags.GranFlags.Light)
642 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
644 /* main event dispatcher in GranSim */
645 switch (event->evttype) {
646 /* Should just be continuing execution */
648 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
649 /* ToDo: check assertion
650 ASSERT(run_queue_hd != (StgTSO*)NULL &&
651 run_queue_hd != END_TSO_QUEUE);
653 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
654 if (!RtsFlags.GranFlags.DoAsyncFetch &&
655 procStatus[CurrentProc]==Fetching) {
656 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
657 CurrentTSO->id, CurrentTSO, CurrentProc);
660 /* Ignore ContinueThreads for completed threads */
661 if (CurrentTSO->what_next == ThreadComplete) {
662 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
663 CurrentTSO->id, CurrentTSO, CurrentProc);
666 /* Ignore ContinueThreads for threads that are being migrated */
667 if (PROCS(CurrentTSO)==Nowhere) {
668 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
669 CurrentTSO->id, CurrentTSO, CurrentProc);
672 /* The thread should be at the beginning of the run queue */
673 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
674 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
675 CurrentTSO->id, CurrentTSO, CurrentProc);
676 break; // run the thread anyway
679 new_event(proc, proc, CurrentTime[proc],
681 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
683 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
684 break; // now actually run the thread; DaH Qu'vam yImuHbej
687 do_the_fetchnode(event);
688 goto next_thread; /* handle next event in event queue */
691 do_the_globalblock(event);
692 goto next_thread; /* handle next event in event queue */
695 do_the_fetchreply(event);
696 goto next_thread; /* handle next event in event queue */
698 case UnblockThread: /* Move from the blocked queue to the tail of */
699 do_the_unblock(event);
700 goto next_thread; /* handle next event in event queue */
702 case ResumeThread: /* Move from the blocked queue to the tail of */
703 /* the runnable queue ( i.e. Qu' SImqa'lu') */
704 event->tso->gran.blocktime +=
705 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
706 do_the_startthread(event);
707 goto next_thread; /* handle next event in event queue */
710 do_the_startthread(event);
711 goto next_thread; /* handle next event in event queue */
714 do_the_movethread(event);
715 goto next_thread; /* handle next event in event queue */
718 do_the_movespark(event);
719 goto next_thread; /* handle next event in event queue */
722 do_the_findwork(event);
723 goto next_thread; /* handle next event in event queue */
726 barf("Illegal event type %u\n", event->evttype);
729 /* This point was scheduler_loop in the old RTS */
731 IF_DEBUG(gran, belch("GRAN: after main switch"));
733 TimeOfLastEvent = CurrentTime[CurrentProc];
734 TimeOfNextEvent = get_time_of_next_event();
735 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
736 // CurrentTSO = ThreadQueueHd;
738 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
741 if (RtsFlags.GranFlags.Light)
742 GranSimLight_leave_system(event, &ActiveTSO);
744 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
747 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
749 /* in a GranSim setup the TSO stays on the run queue */
751 /* Take a thread from the run queue. */
752 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
755 fprintf(stderr, "GRAN: About to run current thread, which is\n");
758 context_switch = 0; // turned on via GranYield, checking events and time slice
761 DumpGranEvent(GR_SCHEDULE, t));
763 procStatus[CurrentProc] = Busy;
766 if (PendingFetches != END_BF_QUEUE) {
770 /* ToDo: phps merge with spark activation above */
771 /* check whether we have local work and send requests if we have none */
772 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
773 /* :-[ no local threads => look out for local sparks */
774 /* the spark pool for the current PE */
775 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
776 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
777 pool->hd < pool->tl) {
779 * ToDo: add GC code check that we really have enough heap afterwards!!
781 * If we're here (no runnable threads) and we have pending
782 * sparks, we must have a space problem. Get enough space
783 * to turn one of those pending sparks into a
787 spark = findSpark(rtsFalse); /* get a spark */
788 if (spark != (rtsSpark) NULL) {
789 tso = activateSpark(spark); /* turn the spark into a thread */
790 IF_PAR_DEBUG(schedule,
791 belch("==== schedule: Created TSO %d (%p); %d threads active",
792 tso->id, tso, advisory_thread_count));
794 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
795 belch("==^^ failed to activate spark");
797 } /* otherwise fall through & pick-up new tso */
799 IF_PAR_DEBUG(verbose,
800 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
801 spark_queue_len(pool)));
806 /* If we still have no work we need to send a FISH to get a spark
809 if (EMPTY_RUN_QUEUE()) {
810 /* =8-[ no local sparks => look for work on other PEs */
812 * We really have absolutely no work. Send out a fish
813 * (there may be some out there already), and wait for
814 * something to arrive. We clearly can't run any threads
815 * until a SCHEDULE or RESUME arrives, and so that's what
816 * we're hoping to see. (Of course, we still have to
817 * respond to other types of messages.)
819 TIME now = msTime() /*CURRENT_TIME*/;
820 IF_PAR_DEBUG(verbose,
821 belch("-- now=%ld", now));
822 IF_PAR_DEBUG(verbose,
823 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
824 (last_fish_arrived_at!=0 &&
825 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
826 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
827 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
828 last_fish_arrived_at,
829 RtsFlags.ParFlags.fishDelay, now);
832 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
833 (last_fish_arrived_at==0 ||
834 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
835 /* outstandingFishes is set in sendFish, processFish;
836 avoid flooding system with fishes via delay */
838 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
841 // Global statistics: count no. of fishes
842 if (RtsFlags.ParFlags.ParStats.Global &&
843 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
844 globalParStats.tot_fish_mess++;
848 receivedFinish = processMessages();
851 } else if (PacketsWaiting()) { /* Look for incoming messages */
852 receivedFinish = processMessages();
855 /* Now we are sure that we have some work available */
856 ASSERT(run_queue_hd != END_TSO_QUEUE);
858 /* Take a thread from the run queue, if we have work */
859 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
860 IF_DEBUG(sanity,checkTSO(t));
862 /* ToDo: write something to the log-file
863 if (RTSflags.ParFlags.granSimStats && !sameThread)
864 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
868 /* the spark pool for the current PE */
869 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
872 belch("--=^ %d threads, %d sparks on [%#x]",
873 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
876 if (0 && RtsFlags.ParFlags.ParStats.Full &&
877 t && LastTSO && t->id != LastTSO->id &&
878 LastTSO->why_blocked == NotBlocked &&
879 LastTSO->what_next != ThreadComplete) {
880 // if previously scheduled TSO not blocked we have to record the context switch
881 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
882 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
885 if (RtsFlags.ParFlags.ParStats.Full &&
886 (emitSchedule /* forced emit */ ||
887 (t && LastTSO && t->id != LastTSO->id))) {
889 we are running a different TSO, so write a schedule event to log file
890 NB: If we use fair scheduling we also have to write a deschedule
891 event for LastTSO; with unfair scheduling we know that the
892 previous tso has blocked whenever we switch to another tso, so
893 we don't need it in GUM for now
895 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
896 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
897 emitSchedule = rtsFalse;
901 #else /* !GRAN && !PAR */
903 /* grab a thread from the run queue
905 ASSERT(run_queue_hd != END_TSO_QUEUE);
908 // Sanity check the thread we're about to run. This can be
909 // expensive if there is lots of thread switching going on...
910 IF_DEBUG(sanity,checkTSO(t));
917 cap = free_capabilities;
918 free_capabilities = cap->link;
919 n_free_capabilities--;
921 cap = &MainCapability;
924 cap->r.rCurrentTSO = t;
926 /* context switches are now initiated by the timer signal, unless
927 * the user specified "context switch as often as possible", with
932 RtsFlags.ProfFlags.profileInterval == 0 ||
934 (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
935 && (run_queue_hd != END_TSO_QUEUE
936 || blocked_queue_hd != END_TSO_QUEUE
937 || sleeping_queue != END_TSO_QUEUE)))
942 RELEASE_LOCK(&sched_mutex);
944 IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
945 t->id, t, whatNext_strs[t->what_next]));
948 startHeapProfTimer();
951 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
952 /* Run the current thread
954 switch (cap->r.rCurrentTSO->what_next) {
957 /* Thread already finished, return to scheduler. */
958 ret = ThreadFinished;
961 ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
964 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
966 case ThreadEnterInterp:
967 ret = interpretBCO(cap);
970 barf("schedule: invalid what_next field");
972 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
974 /* Costs for the scheduler are assigned to CCS_SYSTEM */
980 ACQUIRE_LOCK(&sched_mutex);
983 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
984 #elif !defined(GRAN) && !defined(PAR)
985 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
987 t = cap->r.rCurrentTSO;
990 /* HACK 675: if the last thread didn't yield, make sure to print a
991 SCHEDULE event to the log file when StgRunning the next thread, even
992 if it is the same one as before */
994 TimeOfLastYield = CURRENT_TIME;
1000 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1001 globalGranStats.tot_heapover++;
1003 globalParStats.tot_heapover++;
1006 // did the task ask for a large block?
1007 if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1008 // if so, get one and push it on the front of the nursery.
1012 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1014 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)",
1016 whatNext_strs[t->what_next], blocks));
1018 // don't do this if it would push us over the
1019 // alloc_blocks_lim limit; we'll GC first.
1020 if (alloc_blocks + blocks < alloc_blocks_lim) {
1022 alloc_blocks += blocks;
1023 bd = allocGroup( blocks );
1025 // link the new group into the list
1026 bd->link = cap->r.rCurrentNursery;
1027 bd->u.back = cap->r.rCurrentNursery->u.back;
1028 if (cap->r.rCurrentNursery->u.back != NULL) {
1029 cap->r.rCurrentNursery->u.back->link = bd;
1031 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1032 g0s0->blocks == cap->r.rNursery);
1033 cap->r.rNursery = g0s0->blocks = bd;
1035 cap->r.rCurrentNursery->u.back = bd;
1037 // initialise it as a nursery block
1041 bd->free = bd->start;
1043 // don't forget to update the block count in g0s0.
1044 g0s0->n_blocks += blocks;
1045 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1047 // now update the nursery to point to the new block
1048 cap->r.rCurrentNursery = bd;
1050 // we might be unlucky and have another thread get on the
1051 // run queue before us and steal the large block, but in that
1052 // case the thread will just end up requesting another large
1054 PUSH_ON_RUN_QUEUE(t);
1059 /* make all the running tasks block on a condition variable,
1060 * maybe set context_switch and wait till they all pile in,
1061 * then have them wait on a GC condition variable.
1063 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
1064 t->id, t, whatNext_strs[t->what_next]));
1067 ASSERT(!is_on_queue(t,CurrentProc));
1069 /* Currently we emit a DESCHEDULE event before GC in GUM.
1070 ToDo: either add separate event to distinguish SYSTEM time from rest
1071 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1072 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1073 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1074 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1075 emitSchedule = rtsTrue;
1079 ready_to_gc = rtsTrue;
1080 context_switch = 1; /* stop other threads ASAP */
1081 PUSH_ON_RUN_QUEUE(t);
1082 /* actual GC is done at the end of the while loop */
1088 DumpGranEvent(GR_DESCHEDULE, t));
1089 globalGranStats.tot_stackover++;
1092 // DumpGranEvent(GR_DESCHEDULE, t);
1093 globalParStats.tot_stackover++;
1095 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
1096 t->id, t, whatNext_strs[t->what_next]));
1097 /* just adjust the stack for this thread, then pop it back
1103 /* enlarge the stack */
1104 StgTSO *new_t = threadStackOverflow(t);
1106 /* This TSO has moved, so update any pointers to it from the
1107 * main thread stack. It better not be on any other queues...
1108 * (it shouldn't be).
1110 for (m = main_threads; m != NULL; m = m->link) {
1115 threadPaused(new_t);
1116 PUSH_ON_RUN_QUEUE(new_t);
1120 case ThreadYielding:
1123 DumpGranEvent(GR_DESCHEDULE, t));
1124 globalGranStats.tot_yields++;
1127 // DumpGranEvent(GR_DESCHEDULE, t);
1128 globalParStats.tot_yields++;
1130 /* put the thread back on the run queue. Then, if we're ready to
1131 * GC, check whether this is the last task to stop. If so, wake
1132 * up the GC thread. getThread will block during a GC until the
1136 if (t->what_next == ThreadEnterInterp) {
1137 /* ToDo: or maybe a timer expired when we were in Hugs?
1138 * or maybe someone hit ctrl-C
1140 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
1141 t->id, t, whatNext_strs[t->what_next]);
1143 belch("--<< thread %ld (%p; %s) stopped, yielding",
1144 t->id, t, whatNext_strs[t->what_next]);
1151 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1153 ASSERT(t->link == END_TSO_QUEUE);
1155 ASSERT(!is_on_queue(t,CurrentProc));
1158 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1159 checkThreadQsSanity(rtsTrue));
1162 if (RtsFlags.ParFlags.doFairScheduling) {
1163 /* this does round-robin scheduling; good for concurrency */
1164 APPEND_TO_RUN_QUEUE(t);
1166 /* this does unfair scheduling; good for parallelism */
1167 PUSH_ON_RUN_QUEUE(t);
1170 /* this does round-robin scheduling; good for concurrency */
1171 APPEND_TO_RUN_QUEUE(t);
1174 /* add a ContinueThread event to actually process the thread */
1175 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1177 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1179 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1188 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1189 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1190 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1192 // ??? needed; should emit block before
1194 DumpGranEvent(GR_DESCHEDULE, t));
1195 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1198 ASSERT(procStatus[CurrentProc]==Busy ||
1199 ((procStatus[CurrentProc]==Fetching) &&
1200 (t->block_info.closure!=(StgClosure*)NULL)));
1201 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1202 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1203 procStatus[CurrentProc]==Fetching))
1204 procStatus[CurrentProc] = Idle;
1208 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1209 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1212 if (t->block_info.closure!=(StgClosure*)NULL)
1213 print_bq(t->block_info.closure));
1215 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1218 /* whatever we schedule next, we must log that schedule */
1219 emitSchedule = rtsTrue;
1222 /* don't need to do anything. Either the thread is blocked on
1223 * I/O, in which case we'll have called addToBlockedQueue
1224 * previously, or it's blocked on an MVar or Blackhole, in which
1225 * case it'll be on the relevant queue already.
1228 fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1229 printThreadBlockage(t);
1230 fprintf(stderr, "\n"));
1232 /* Only for dumping event to log file
1233 ToDo: do I need this in GranSim, too?
1240 case ThreadFinished:
1241 /* Need to check whether this was a main thread, and if so, signal
1242 * the task that started it with the return value. If we have no
1243 * more main threads, we probably need to stop all the tasks until
1246 /* We also end up here if the thread kills itself with an
1247 * uncaught exception, see Exception.hc.
1249 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1251 endThread(t, CurrentProc); // clean-up the thread
1253 /* For now all are advisory -- HWL */
1254 //if(t->priority==AdvisoryPriority) ??
1255 advisory_thread_count--;
1258 if(t->dist.priority==RevalPriority)
1262 if (RtsFlags.ParFlags.ParStats.Full &&
1263 !RtsFlags.ParFlags.ParStats.Suppressed)
1264 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1269 barf("schedule: invalid thread return code %d", (int)ret);
1273 cap->link = free_capabilities;
1274 free_capabilities = cap;
1275 n_free_capabilities++;
1279 if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1280 GarbageCollect(GetRoots, rtsTrue);
1282 performHeapProfile = rtsFalse;
1283 ready_to_gc = rtsFalse; // we already GC'd
1288 if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
1293 /* everybody back, start the GC.
1294 * Could do it in this thread, or signal a condition var
1295 * to do it in another thread. Either way, we need to
1296 * broadcast on gc_pending_cond afterward.
1299 IF_DEBUG(scheduler,sched_belch("doing GC"));
1301 GarbageCollect(GetRoots,rtsFalse);
1302 ready_to_gc = rtsFalse;
1304 pthread_cond_broadcast(&gc_pending_cond);
1307 /* add a ContinueThread event to continue execution of current thread */
1308 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1310 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1312 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1320 IF_GRAN_DEBUG(unused,
1321 print_eventq(EventHd));
1323 event = get_next_event();
1326 /* ToDo: wait for next message to arrive rather than busy wait */
1329 } /* end of while(1) */
1331 IF_PAR_DEBUG(verbose,
1332 belch("== Leaving schedule() after having received Finish"));
1335 /* ---------------------------------------------------------------------------
1336 * deleteAllThreads(): kill all the live threads.
1338 * This is used when we catch a user interrupt (^C), before performing
1339 * any necessary cleanups and running finalizers.
1340 * ------------------------------------------------------------------------- */
1342 void deleteAllThreads ( void )
1345 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1346 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1349 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1352 for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1355 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1356 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1357 sleeping_queue = END_TSO_QUEUE;
1360 /* startThread and insertThread are now in GranSim.c -- HWL */
1362 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1363 //@subsection Suspend and Resume
1365 /* ---------------------------------------------------------------------------
1366 * Suspending & resuming Haskell threads.
1368 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1369 * its capability before calling the C function. This allows another
1370 * task to pick up the capability and carry on running Haskell
1371 * threads. It also means that if the C call blocks, it won't lock
1374 * The Haskell thread making the C call is put to sleep for the
1375 * duration of the call, on the susepended_ccalling_threads queue. We
1376 * give out a token to the task, which it can use to resume the thread
1377 * on return from the C function.
1378 * ------------------------------------------------------------------------- */
1381 suspendThread( StgRegTable *reg )
1386 // assume that *reg is a pointer to the StgRegTable part of a Capability
1387 cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1389 ACQUIRE_LOCK(&sched_mutex);
1392 sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1394 threadPaused(cap->r.rCurrentTSO);
1395 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1396 suspended_ccalling_threads = cap->r.rCurrentTSO;
1398 /* Use the thread ID as the token; it should be unique */
1399 tok = cap->r.rCurrentTSO->id;
1402 cap->link = free_capabilities;
1403 free_capabilities = cap;
1404 n_free_capabilities++;
1407 RELEASE_LOCK(&sched_mutex);
1412 resumeThread( StgInt tok )
1414 StgTSO *tso, **prev;
1417 ACQUIRE_LOCK(&sched_mutex);
1419 prev = &suspended_ccalling_threads;
1420 for (tso = suspended_ccalling_threads;
1421 tso != END_TSO_QUEUE;
1422 prev = &tso->link, tso = tso->link) {
1423 if (tso->id == (StgThreadID)tok) {
1428 if (tso == END_TSO_QUEUE) {
1429 barf("resumeThread: thread not found");
1431 tso->link = END_TSO_QUEUE;
1434 while (free_capabilities == NULL) {
1435 IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1436 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1437 IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1439 cap = free_capabilities;
1440 free_capabilities = cap->link;
1441 n_free_capabilities--;
1443 cap = &MainCapability;
1446 cap->r.rCurrentTSO = tso;
1448 RELEASE_LOCK(&sched_mutex);
1453 /* ---------------------------------------------------------------------------
1455 * ------------------------------------------------------------------------ */
1456 static void unblockThread(StgTSO *tso);
1458 /* ---------------------------------------------------------------------------
1459 * Comparing Thread ids.
1461 * This is used from STG land in the implementation of the
1462 * instances of Eq/Ord for ThreadIds.
1463 * ------------------------------------------------------------------------ */
1465 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1467 StgThreadID id1 = tso1->id;
1468 StgThreadID id2 = tso2->id;
1470 if (id1 < id2) return (-1);
1471 if (id1 > id2) return 1;
1475 /* ---------------------------------------------------------------------------
1476 * Fetching the ThreadID from an StgTSO.
1478 * This is used in the implementation of Show for ThreadIds.
1479 * ------------------------------------------------------------------------ */
1480 int rts_getThreadId(const StgTSO *tso)
1485 /* ---------------------------------------------------------------------------
1486 Create a new thread.
1488 The new thread starts with the given stack size. Before the
1489 scheduler can run, however, this thread needs to have a closure
1490 (and possibly some arguments) pushed on its stack. See
1491 pushClosure() in Schedule.h.
1493 createGenThread() and createIOThread() (in SchedAPI.h) are
1494 convenient packaged versions of this function.
1496 currently pri (priority) is only used in a GRAN setup -- HWL
1497 ------------------------------------------------------------------------ */
1498 //@cindex createThread
1500 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1502 createThread(nat stack_size, StgInt pri)
1504 return createThread_(stack_size, rtsFalse, pri);
1508 createThread_(nat size, rtsBool have_lock, StgInt pri)
1512 createThread(nat stack_size)
1514 return createThread_(stack_size, rtsFalse);
1518 createThread_(nat size, rtsBool have_lock)
1525 /* First check whether we should create a thread at all */
1527 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1528 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1530 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1531 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1532 return END_TSO_QUEUE;
1538 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1541 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1543 /* catch ridiculously small stack sizes */
1544 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1545 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1548 stack_size = size - TSO_STRUCT_SIZEW;
1550 tso = (StgTSO *)allocate(size);
1551 TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1553 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1555 SET_GRAN_HDR(tso, ThisPE);
1557 tso->what_next = ThreadEnterGHC;
1559 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1560 * protect the increment operation on next_thread_id.
1561 * In future, we could use an atomic increment instead.
1563 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1564 tso->id = next_thread_id++;
1565 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1567 tso->why_blocked = NotBlocked;
1568 tso->blocked_exceptions = NULL;
1570 tso->stack_size = stack_size;
1571 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1573 tso->sp = (P_)&(tso->stack) + stack_size;
1576 tso->prof.CCCS = CCS_MAIN;
1579 /* put a stop frame on the stack */
1580 tso->sp -= sizeofW(StgStopFrame);
1581 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1582 tso->su = (StgUpdateFrame*)tso->sp;
1586 tso->link = END_TSO_QUEUE;
1587 /* uses more flexible routine in GranSim */
1588 insertThread(tso, CurrentProc);
1590 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1596 if (RtsFlags.GranFlags.GranSimStats.Full)
1597 DumpGranEvent(GR_START,tso);
1599 if (RtsFlags.ParFlags.ParStats.Full)
1600 DumpGranEvent(GR_STARTQ,tso);
1601 /* HACk to avoid SCHEDULE
1605 /* Link the new thread on the global thread list.
1607 tso->global_link = all_threads;
1611 tso->dist.priority = MandatoryPriority; //by default that is...
1615 tso->gran.pri = pri;
1617 tso->gran.magic = TSO_MAGIC; // debugging only
1619 tso->gran.sparkname = 0;
1620 tso->gran.startedat = CURRENT_TIME;
1621 tso->gran.exported = 0;
1622 tso->gran.basicblocks = 0;
1623 tso->gran.allocs = 0;
1624 tso->gran.exectime = 0;
1625 tso->gran.fetchtime = 0;
1626 tso->gran.fetchcount = 0;
1627 tso->gran.blocktime = 0;
1628 tso->gran.blockcount = 0;
1629 tso->gran.blockedat = 0;
1630 tso->gran.globalsparks = 0;
1631 tso->gran.localsparks = 0;
1632 if (RtsFlags.GranFlags.Light)
1633 tso->gran.clock = Now; /* local clock */
1635 tso->gran.clock = 0;
1637 IF_DEBUG(gran,printTSO(tso));
1640 tso->par.magic = TSO_MAGIC; // debugging only
1642 tso->par.sparkname = 0;
1643 tso->par.startedat = CURRENT_TIME;
1644 tso->par.exported = 0;
1645 tso->par.basicblocks = 0;
1646 tso->par.allocs = 0;
1647 tso->par.exectime = 0;
1648 tso->par.fetchtime = 0;
1649 tso->par.fetchcount = 0;
1650 tso->par.blocktime = 0;
1651 tso->par.blockcount = 0;
1652 tso->par.blockedat = 0;
1653 tso->par.globalsparks = 0;
1654 tso->par.localsparks = 0;
1658 globalGranStats.tot_threads_created++;
1659 globalGranStats.threads_created_on_PE[CurrentProc]++;
1660 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1661 globalGranStats.tot_sq_probes++;
1663 // collect parallel global statistics (currently done together with GC stats)
1664 if (RtsFlags.ParFlags.ParStats.Global &&
1665 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1666 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
1667 globalParStats.tot_threads_created++;
1673 belch("==__ schedule: Created TSO %d (%p);",
1674 CurrentProc, tso, tso->id));
1676 IF_PAR_DEBUG(verbose,
1677 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1678 tso->id, tso, advisory_thread_count));
1680 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1681 tso->id, tso->stack_size));
1688 all parallel thread creation calls should fall through the following routine.
1691 createSparkThread(rtsSpark spark)
1693 ASSERT(spark != (rtsSpark)NULL);
1694 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1696 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1697 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1698 return END_TSO_QUEUE;
1702 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1703 if (tso==END_TSO_QUEUE)
1704 barf("createSparkThread: Cannot create TSO");
1706 tso->priority = AdvisoryPriority;
1708 pushClosure(tso,spark);
1709 PUSH_ON_RUN_QUEUE(tso);
1710 advisory_thread_count++;
1717 Turn a spark into a thread.
1718 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1721 //@cindex activateSpark
1723 activateSpark (rtsSpark spark)
1727 tso = createSparkThread(spark);
1728 if (RtsFlags.ParFlags.ParStats.Full) {
1729 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1730 IF_PAR_DEBUG(verbose,
1731 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1732 (StgClosure *)spark, info_type((StgClosure *)spark)));
1734 // ToDo: fwd info on local/global spark to thread -- HWL
1735 // tso->gran.exported = spark->exported;
1736 // tso->gran.locked = !spark->global;
1737 // tso->gran.sparkname = spark->name;
1743 /* ---------------------------------------------------------------------------
1746 * scheduleThread puts a thread on the head of the runnable queue.
1747 * This will usually be done immediately after a thread is created.
1748 * The caller of scheduleThread must create the thread using e.g.
1749 * createThread and push an appropriate closure
1750 * on this thread's stack before the scheduler is invoked.
1751 * ------------------------------------------------------------------------ */
1754 scheduleThread(StgTSO *tso)
1756 if (tso==END_TSO_QUEUE){
1761 ACQUIRE_LOCK(&sched_mutex);
1763 /* Put the new thread on the head of the runnable queue. The caller
1764 * better push an appropriate closure on this thread's stack
1765 * beforehand. In the SMP case, the thread may start running as
1766 * soon as we release the scheduler lock below.
1768 PUSH_ON_RUN_QUEUE(tso);
1772 IF_DEBUG(scheduler,printTSO(tso));
1774 RELEASE_LOCK(&sched_mutex);
1777 /* ---------------------------------------------------------------------------
1780 * Start up Posix threads to run each of the scheduler tasks.
1781 * I believe the task ids are not needed in the system as defined.
1783 * ------------------------------------------------------------------------ */
1785 #if defined(PAR) || defined(SMP)
1787 taskStart(void) /* ( void *arg STG_UNUSED) */
1789 scheduleThread(END_TSO_QUEUE);
1793 /* ---------------------------------------------------------------------------
1796 * Initialise the scheduler. This resets all the queues - if the
1797 * queues contained any threads, they'll be garbage collected at the
1800 * This now calls startTasks(), so should only be called once! KH @ 25/10/99
1801 * ------------------------------------------------------------------------ */
1805 term_handler(int sig STG_UNUSED)
1808 ACQUIRE_LOCK(&term_mutex);
1810 RELEASE_LOCK(&term_mutex);
1816 initCapability( Capability *cap )
1818 cap->f.stgChk0 = (F_)__stg_chk_0;
1819 cap->f.stgChk1 = (F_)__stg_chk_1;
1820 cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
1821 cap->f.stgUpdatePAP = (F_)__stg_update_PAP;
1830 for (i=0; i<=MAX_PROC; i++) {
1831 run_queue_hds[i] = END_TSO_QUEUE;
1832 run_queue_tls[i] = END_TSO_QUEUE;
1833 blocked_queue_hds[i] = END_TSO_QUEUE;
1834 blocked_queue_tls[i] = END_TSO_QUEUE;
1835 ccalling_threadss[i] = END_TSO_QUEUE;
1836 sleeping_queue = END_TSO_QUEUE;
1839 run_queue_hd = END_TSO_QUEUE;
1840 run_queue_tl = END_TSO_QUEUE;
1841 blocked_queue_hd = END_TSO_QUEUE;
1842 blocked_queue_tl = END_TSO_QUEUE;
1843 sleeping_queue = END_TSO_QUEUE;
1846 suspended_ccalling_threads = END_TSO_QUEUE;
1848 main_threads = NULL;
1849 all_threads = END_TSO_QUEUE;
1854 RtsFlags.ConcFlags.ctxtSwitchTicks =
1855 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1857 /* Install the SIGHUP handler */
1860 struct sigaction action,oact;
1862 action.sa_handler = term_handler;
1863 sigemptyset(&action.sa_mask);
1864 action.sa_flags = 0;
1865 if (sigaction(SIGTERM, &action, &oact) != 0) {
1866 barf("can't install TERM handler");
1872 /* Allocate N Capabilities */
1875 Capability *cap, *prev;
1878 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1879 cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1880 initCapability(cap);
1884 free_capabilities = cap;
1885 n_free_capabilities = RtsFlags.ParFlags.nNodes;
1887 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1888 n_free_capabilities););
1890 initCapability(&MainCapability);
1893 #if defined(SMP) || defined(PAR)
1906 /* make some space for saving all the thread ids */
1907 task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1908 "initScheduler:task_ids");
1910 /* and create all the threads */
1911 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1912 r = pthread_create(&tid,NULL,taskStart,NULL);
1914 barf("startTasks: Can't create new Posix thread");
1916 task_ids[i].id = tid;
1917 task_ids[i].mut_time = 0.0;
1918 task_ids[i].mut_etime = 0.0;
1919 task_ids[i].gc_time = 0.0;
1920 task_ids[i].gc_etime = 0.0;
1921 task_ids[i].elapsedtimestart = elapsedtime();
1922 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1928 exitScheduler( void )
1933 /* Don't want to use pthread_cancel, since we'd have to install
1934 * these silly exception handlers (pthread_cleanup_{push,pop}) around
1938 /* Cancel all our tasks */
1939 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1940 pthread_cancel(task_ids[i].id);
1943 /* Wait for all the tasks to terminate */
1944 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1945 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n",
1947 pthread_join(task_ids[i].id, NULL);
1951 /* Send 'em all a SIGHUP. That should shut 'em up.
1953 await_death = RtsFlags.ParFlags.nNodes;
1954 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1955 pthread_kill(task_ids[i].id,SIGTERM);
1957 while (await_death > 0) {
1963 /* -----------------------------------------------------------------------------
1964 Managing the per-task allocation areas.
1966 Each capability comes with an allocation area. These are
1967 fixed-length block lists into which allocation can be done.
1969 ToDo: no support for two-space collection at the moment???
1970 -------------------------------------------------------------------------- */
1972 /* -----------------------------------------------------------------------------
1973 * waitThread is the external interface for running a new computation
1974 * and waiting for the result.
1976 * In the non-SMP case, we create a new main thread, push it on the
1977 * main-thread stack, and invoke the scheduler to run it. The
1978 * scheduler will return when the top main thread on the stack has
1979 * completed or died, and fill in the necessary fields of the
1980 * main_thread structure.
1982 * In the SMP case, we create a main thread as before, but we then
1983 * create a new condition variable and sleep on it. When our new
1984 * main thread has completed, we'll be woken up and the status/result
1985 * will be in the main_thread struct.
1986 * -------------------------------------------------------------------------- */
1989 howManyThreadsAvail ( void )
1993 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1995 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1997 for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2003 finishAllThreads ( void )
2006 while (run_queue_hd != END_TSO_QUEUE) {
2007 waitThread ( run_queue_hd, NULL );
2009 while (blocked_queue_hd != END_TSO_QUEUE) {
2010 waitThread ( blocked_queue_hd, NULL );
2012 while (sleeping_queue != END_TSO_QUEUE) {
2013 waitThread ( blocked_queue_hd, NULL );
2016 (blocked_queue_hd != END_TSO_QUEUE ||
2017 run_queue_hd != END_TSO_QUEUE ||
2018 sleeping_queue != END_TSO_QUEUE);
2022 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2025 SchedulerStatus stat;
2027 ACQUIRE_LOCK(&sched_mutex);
2029 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2035 pthread_cond_init(&m->wakeup, NULL);
2038 m->link = main_threads;
2041 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n",
2046 pthread_cond_wait(&m->wakeup, &sched_mutex);
2047 } while (m->stat == NoStatus);
2049 /* GranSim specific init */
2050 CurrentTSO = m->tso; // the TSO to run
2051 procStatus[MainProc] = Busy; // status of main PE
2052 CurrentProc = MainProc; // PE to run it on
2057 ASSERT(m->stat != NoStatus);
2063 pthread_cond_destroy(&m->wakeup);
2066 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
2070 RELEASE_LOCK(&sched_mutex);
2075 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2076 //@subsection Run queue code
2080 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2081 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2082 implicit global variable that has to be correct when calling these
2086 /* Put the new thread on the head of the runnable queue.
2087 * The caller of createThread better push an appropriate closure
2088 * on this thread's stack before the scheduler is invoked.
2090 static /* inline */ void
2091 add_to_run_queue(tso)
2094 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2095 tso->link = run_queue_hd;
2097 if (run_queue_tl == END_TSO_QUEUE) {
2102 /* Put the new thread at the end of the runnable queue. */
2103 static /* inline */ void
2104 push_on_run_queue(tso)
2107 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2108 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2109 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2110 if (run_queue_hd == END_TSO_QUEUE) {
2113 run_queue_tl->link = tso;
2119 Should be inlined because it's used very often in schedule. The tso
2120 argument is actually only needed in GranSim, where we want to have the
2121 possibility to schedule *any* TSO on the run queue, irrespective of the
2122 actual ordering. Therefore, if tso is not the nil TSO then we traverse
2123 the run queue and dequeue the tso, adjusting the links in the queue.
2125 //@cindex take_off_run_queue
2126 static /* inline */ StgTSO*
2127 take_off_run_queue(StgTSO *tso) {
2131 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2133 if tso is specified, unlink that tso from the run_queue (doesn't have
2134 to be at the beginning of the queue); GranSim only
2136 if (tso!=END_TSO_QUEUE) {
2137 /* find tso in queue */
2138 for (t=run_queue_hd, prev=END_TSO_QUEUE;
2139 t!=END_TSO_QUEUE && t!=tso;
2143 /* now actually dequeue the tso */
2144 if (prev!=END_TSO_QUEUE) {
2145 ASSERT(run_queue_hd!=t);
2146 prev->link = t->link;
2148 /* t is at beginning of thread queue */
2149 ASSERT(run_queue_hd==t);
2150 run_queue_hd = t->link;
2152 /* t is at end of thread queue */
2153 if (t->link==END_TSO_QUEUE) {
2154 ASSERT(t==run_queue_tl);
2155 run_queue_tl = prev;
2157 ASSERT(run_queue_tl!=t);
2159 t->link = END_TSO_QUEUE;
2161 /* take tso from the beginning of the queue; std concurrent code */
2163 if (t != END_TSO_QUEUE) {
2164 run_queue_hd = t->link;
2165 t->link = END_TSO_QUEUE;
2166 if (run_queue_hd == END_TSO_QUEUE) {
2167 run_queue_tl = END_TSO_QUEUE;
2176 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2177 //@subsection Garbage Collextion Routines
2179 /* ---------------------------------------------------------------------------
2180 Where are the roots that we know about?
2182 - all the threads on the runnable queue
2183 - all the threads on the blocked queue
2184 - all the threads on the sleeping queue
2185 - all the thread currently executing a _ccall_GC
2186 - all the "main threads"
2188 ------------------------------------------------------------------------ */
2190 /* This has to be protected either by the scheduler monitor, or by the
2191 garbage collection monitor (probably the latter).
2196 GetRoots(evac_fn evac)
2203 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2204 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2205 evac((StgClosure **)&run_queue_hds[i]);
2206 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2207 evac((StgClosure **)&run_queue_tls[i]);
2209 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2210 evac((StgClosure **)&blocked_queue_hds[i]);
2211 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2212 evac((StgClosure **)&blocked_queue_tls[i]);
2213 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2214 evac((StgClosure **)&ccalling_threads[i]);
2221 if (run_queue_hd != END_TSO_QUEUE) {
2222 ASSERT(run_queue_tl != END_TSO_QUEUE);
2223 evac((StgClosure **)&run_queue_hd);
2224 evac((StgClosure **)&run_queue_tl);
2227 if (blocked_queue_hd != END_TSO_QUEUE) {
2228 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2229 evac((StgClosure **)&blocked_queue_hd);
2230 evac((StgClosure **)&blocked_queue_tl);
2233 if (sleeping_queue != END_TSO_QUEUE) {
2234 evac((StgClosure **)&sleeping_queue);
2238 for (m = main_threads; m != NULL; m = m->link) {
2239 evac((StgClosure **)&m->tso);
2241 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2242 evac((StgClosure **)&suspended_ccalling_threads);
2245 #if defined(SMP) || defined(PAR) || defined(GRAN)
2246 markSparkQueue(evac);
2250 /* -----------------------------------------------------------------------------
2253 This is the interface to the garbage collector from Haskell land.
2254 We provide this so that external C code can allocate and garbage
2255 collect when called from Haskell via _ccall_GC.
2257 It might be useful to provide an interface whereby the programmer
2258 can specify more roots (ToDo).
2260 This needs to be protected by the GC condition variable above. KH.
2261 -------------------------------------------------------------------------- */
2263 void (*extra_roots)(evac_fn);
2268 GarbageCollect(GetRoots,rtsFalse);
2272 performMajorGC(void)
2274 GarbageCollect(GetRoots,rtsTrue);
2278 AllRoots(evac_fn evac)
2280 GetRoots(evac); // the scheduler's roots
2281 extra_roots(evac); // the user's roots
2285 performGCWithRoots(void (*get_roots)(evac_fn))
2287 extra_roots = get_roots;
2288 GarbageCollect(AllRoots,rtsFalse);
2291 /* -----------------------------------------------------------------------------
2294 If the thread has reached its maximum stack size, then raise the
2295 StackOverflow exception in the offending thread. Otherwise
2296 relocate the TSO into a larger chunk of memory and adjust its stack
2298 -------------------------------------------------------------------------- */
2301 threadStackOverflow(StgTSO *tso)
2303 nat new_stack_size, new_tso_size, diff, stack_words;
2307 IF_DEBUG(sanity,checkTSO(tso));
2308 if (tso->stack_size >= tso->max_stack_size) {
2311 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2312 tso->id, tso, tso->stack_size, tso->max_stack_size);
2313 /* If we're debugging, just print out the top of the stack */
2314 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2317 /* Send this thread the StackOverflow exception */
2318 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2322 /* Try to double the current stack size. If that takes us over the
2323 * maximum stack size for this thread, then use the maximum instead.
2324 * Finally round up so the TSO ends up as a whole number of blocks.
2326 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2327 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2328 TSO_STRUCT_SIZE)/sizeof(W_);
2329 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2330 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2332 IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2334 dest = (StgTSO *)allocate(new_tso_size);
2335 TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2337 /* copy the TSO block and the old stack into the new area */
2338 memcpy(dest,tso,TSO_STRUCT_SIZE);
2339 stack_words = tso->stack + tso->stack_size - tso->sp;
2340 new_sp = (P_)dest + new_tso_size - stack_words;
2341 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2343 /* relocate the stack pointers... */
2344 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2345 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2347 dest->stack_size = new_stack_size;
2349 /* and relocate the update frame list */
2350 relocate_stack(dest, diff);
2352 /* Mark the old TSO as relocated. We have to check for relocated
2353 * TSOs in the garbage collector and any primops that deal with TSOs.
2355 * It's important to set the sp and su values to just beyond the end
2356 * of the stack, so we don't attempt to scavenge any part of the
2359 tso->what_next = ThreadRelocated;
2361 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2362 tso->su = (StgUpdateFrame *)tso->sp;
2363 tso->why_blocked = NotBlocked;
2364 dest->mut_link = NULL;
2366 IF_PAR_DEBUG(verbose,
2367 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2368 tso->id, tso, tso->stack_size);
2369 /* If we're debugging, just print out the top of the stack */
2370 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2373 IF_DEBUG(sanity,checkTSO(tso));
2375 IF_DEBUG(scheduler,printTSO(dest));
2381 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2382 //@subsection Blocking Queue Routines
2384 /* ---------------------------------------------------------------------------
2385 Wake up a queue that was blocked on some resource.
2386 ------------------------------------------------------------------------ */
2390 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2395 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2397 /* write RESUME events to log file and
2398 update blocked and fetch time (depending on type of the orig closure) */
2399 if (RtsFlags.ParFlags.ParStats.Full) {
2400 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2401 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2402 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2403 if (EMPTY_RUN_QUEUE())
2404 emitSchedule = rtsTrue;
2406 switch (get_itbl(node)->type) {
2408 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2413 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2420 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2427 static StgBlockingQueueElement *
2428 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2431 PEs node_loc, tso_loc;
2433 node_loc = where_is(node); // should be lifted out of loop
2434 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2435 tso_loc = where_is((StgClosure *)tso);
2436 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2437 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2438 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2439 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2440 // insertThread(tso, node_loc);
2441 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2443 tso, node, (rtsSpark*)NULL);
2444 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2447 } else { // TSO is remote (actually should be FMBQ)
2448 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2449 RtsFlags.GranFlags.Costs.gunblocktime +
2450 RtsFlags.GranFlags.Costs.latency;
2451 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2453 tso, node, (rtsSpark*)NULL);
2454 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2457 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2459 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2460 (node_loc==tso_loc ? "Local" : "Global"),
2461 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2462 tso->block_info.closure = NULL;
2463 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2467 static StgBlockingQueueElement *
2468 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2470 StgBlockingQueueElement *next;
2472 switch (get_itbl(bqe)->type) {
2474 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2475 /* if it's a TSO just push it onto the run_queue */
2477 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2478 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2480 unblockCount(bqe, node);
2481 /* reset blocking status after dumping event */
2482 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2486 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2488 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2489 PendingFetches = (StgBlockedFetch *)bqe;
2493 /* can ignore this case in a non-debugging setup;
2494 see comments on RBHSave closures above */
2496 /* check that the closure is an RBHSave closure */
2497 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2498 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2499 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2503 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2504 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2508 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2512 #else /* !GRAN && !PAR */
2514 unblockOneLocked(StgTSO *tso)
2518 ASSERT(get_itbl(tso)->type == TSO);
2519 ASSERT(tso->why_blocked != NotBlocked);
2520 tso->why_blocked = NotBlocked;
2522 PUSH_ON_RUN_QUEUE(tso);
2524 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2529 #if defined(GRAN) || defined(PAR)
2530 inline StgBlockingQueueElement *
2531 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2533 ACQUIRE_LOCK(&sched_mutex);
2534 bqe = unblockOneLocked(bqe, node);
2535 RELEASE_LOCK(&sched_mutex);
2540 unblockOne(StgTSO *tso)
2542 ACQUIRE_LOCK(&sched_mutex);
2543 tso = unblockOneLocked(tso);
2544 RELEASE_LOCK(&sched_mutex);
2551 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2553 StgBlockingQueueElement *bqe;
2558 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2559 node, CurrentProc, CurrentTime[CurrentProc],
2560 CurrentTSO->id, CurrentTSO));
2562 node_loc = where_is(node);
2564 ASSERT(q == END_BQ_QUEUE ||
2565 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2566 get_itbl(q)->type == CONSTR); // closure (type constructor)
2567 ASSERT(is_unique(node));
2569 /* FAKE FETCH: magically copy the node to the tso's proc;
2570 no Fetch necessary because in reality the node should not have been
2571 moved to the other PE in the first place
2573 if (CurrentProc!=node_loc) {
2575 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2576 node, node_loc, CurrentProc, CurrentTSO->id,
2577 // CurrentTSO, where_is(CurrentTSO),
2578 node->header.gran.procs));
2579 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2581 belch("## new bitmask of node %p is %#x",
2582 node, node->header.gran.procs));
2583 if (RtsFlags.GranFlags.GranSimStats.Global) {
2584 globalGranStats.tot_fake_fetches++;
2589 // ToDo: check: ASSERT(CurrentProc==node_loc);
2590 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2593 bqe points to the current element in the queue
2594 next points to the next element in the queue
2596 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2597 //tso_loc = where_is(tso);
2599 bqe = unblockOneLocked(bqe, node);
2602 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2603 the closure to make room for the anchor of the BQ */
2604 if (bqe!=END_BQ_QUEUE) {
2605 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2607 ASSERT((info_ptr==&RBH_Save_0_info) ||
2608 (info_ptr==&RBH_Save_1_info) ||
2609 (info_ptr==&RBH_Save_2_info));
2611 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2612 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2613 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2616 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2617 node, info_type(node)));
2620 /* statistics gathering */
2621 if (RtsFlags.GranFlags.GranSimStats.Global) {
2622 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2623 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2624 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2625 globalGranStats.tot_awbq++; // total no. of bqs awakened
2628 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2629 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2633 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2635 StgBlockingQueueElement *bqe;
2637 ACQUIRE_LOCK(&sched_mutex);
2639 IF_PAR_DEBUG(verbose,
2640 belch("##-_ AwBQ for node %p on [%x]: ",
2644 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2645 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2650 ASSERT(q == END_BQ_QUEUE ||
2651 get_itbl(q)->type == TSO ||
2652 get_itbl(q)->type == BLOCKED_FETCH ||
2653 get_itbl(q)->type == CONSTR);
2656 while (get_itbl(bqe)->type==TSO ||
2657 get_itbl(bqe)->type==BLOCKED_FETCH) {
2658 bqe = unblockOneLocked(bqe, node);
2660 RELEASE_LOCK(&sched_mutex);
2663 #else /* !GRAN && !PAR */
2665 awakenBlockedQueue(StgTSO *tso)
2667 ACQUIRE_LOCK(&sched_mutex);
2668 while (tso != END_TSO_QUEUE) {
2669 tso = unblockOneLocked(tso);
2671 RELEASE_LOCK(&sched_mutex);
2675 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2676 //@subsection Exception Handling Routines
2678 /* ---------------------------------------------------------------------------
2680 - usually called inside a signal handler so it mustn't do anything fancy.
2681 ------------------------------------------------------------------------ */
2684 interruptStgRts(void)
2690 /* -----------------------------------------------------------------------------
2693 This is for use when we raise an exception in another thread, which
2695 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2696 -------------------------------------------------------------------------- */
2698 #if defined(GRAN) || defined(PAR)
2700 NB: only the type of the blocking queue is different in GranSim and GUM
2701 the operations on the queue-elements are the same
2702 long live polymorphism!
2705 unblockThread(StgTSO *tso)
2707 StgBlockingQueueElement *t, **last;
2709 ACQUIRE_LOCK(&sched_mutex);
2710 switch (tso->why_blocked) {
2713 return; /* not blocked */
2716 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2718 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2719 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2721 last = (StgBlockingQueueElement **)&mvar->head;
2722 for (t = (StgBlockingQueueElement *)mvar->head;
2724 last = &t->link, last_tso = t, t = t->link) {
2725 if (t == (StgBlockingQueueElement *)tso) {
2726 *last = (StgBlockingQueueElement *)tso->link;
2727 if (mvar->tail == tso) {
2728 mvar->tail = (StgTSO *)last_tso;
2733 barf("unblockThread (MVAR): TSO not found");
2736 case BlockedOnBlackHole:
2737 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2739 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2741 last = &bq->blocking_queue;
2742 for (t = bq->blocking_queue;
2744 last = &t->link, t = t->link) {
2745 if (t == (StgBlockingQueueElement *)tso) {
2746 *last = (StgBlockingQueueElement *)tso->link;
2750 barf("unblockThread (BLACKHOLE): TSO not found");
2753 case BlockedOnException:
2755 StgTSO *target = tso->block_info.tso;
2757 ASSERT(get_itbl(target)->type == TSO);
2759 if (target->what_next == ThreadRelocated) {
2760 target = target->link;
2761 ASSERT(get_itbl(target)->type == TSO);
2764 ASSERT(target->blocked_exceptions != NULL);
2766 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2767 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2769 last = &t->link, t = t->link) {
2770 ASSERT(get_itbl(t)->type == TSO);
2771 if (t == (StgBlockingQueueElement *)tso) {
2772 *last = (StgBlockingQueueElement *)tso->link;
2776 barf("unblockThread (Exception): TSO not found");
2780 case BlockedOnWrite:
2782 /* take TSO off blocked_queue */
2783 StgBlockingQueueElement *prev = NULL;
2784 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2785 prev = t, t = t->link) {
2786 if (t == (StgBlockingQueueElement *)tso) {
2788 blocked_queue_hd = (StgTSO *)t->link;
2789 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2790 blocked_queue_tl = END_TSO_QUEUE;
2793 prev->link = t->link;
2794 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2795 blocked_queue_tl = (StgTSO *)prev;
2801 barf("unblockThread (I/O): TSO not found");
2804 case BlockedOnDelay:
2806 /* take TSO off sleeping_queue */
2807 StgBlockingQueueElement *prev = NULL;
2808 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2809 prev = t, t = t->link) {
2810 if (t == (StgBlockingQueueElement *)tso) {
2812 sleeping_queue = (StgTSO *)t->link;
2814 prev->link = t->link;
2819 barf("unblockThread (I/O): TSO not found");
2823 barf("unblockThread");
2827 tso->link = END_TSO_QUEUE;
2828 tso->why_blocked = NotBlocked;
2829 tso->block_info.closure = NULL;
2830 PUSH_ON_RUN_QUEUE(tso);
2831 RELEASE_LOCK(&sched_mutex);
2835 unblockThread(StgTSO *tso)
2839 ACQUIRE_LOCK(&sched_mutex);
2840 switch (tso->why_blocked) {
2843 return; /* not blocked */
2846 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2848 StgTSO *last_tso = END_TSO_QUEUE;
2849 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2852 for (t = mvar->head; t != END_TSO_QUEUE;
2853 last = &t->link, last_tso = t, t = t->link) {
2856 if (mvar->tail == tso) {
2857 mvar->tail = last_tso;
2862 barf("unblockThread (MVAR): TSO not found");
2865 case BlockedOnBlackHole:
2866 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2868 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2870 last = &bq->blocking_queue;
2871 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2872 last = &t->link, t = t->link) {
2878 barf("unblockThread (BLACKHOLE): TSO not found");
2881 case BlockedOnException:
2883 StgTSO *target = tso->block_info.tso;
2885 ASSERT(get_itbl(target)->type == TSO);
2887 while (target->what_next == ThreadRelocated) {
2888 target = target->link;
2889 ASSERT(get_itbl(target)->type == TSO);
2892 ASSERT(target->blocked_exceptions != NULL);
2894 last = &target->blocked_exceptions;
2895 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2896 last = &t->link, t = t->link) {
2897 ASSERT(get_itbl(t)->type == TSO);
2903 barf("unblockThread (Exception): TSO not found");
2907 case BlockedOnWrite:
2909 StgTSO *prev = NULL;
2910 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2911 prev = t, t = t->link) {
2914 blocked_queue_hd = t->link;
2915 if (blocked_queue_tl == t) {
2916 blocked_queue_tl = END_TSO_QUEUE;
2919 prev->link = t->link;
2920 if (blocked_queue_tl == t) {
2921 blocked_queue_tl = prev;
2927 barf("unblockThread (I/O): TSO not found");
2930 case BlockedOnDelay:
2932 StgTSO *prev = NULL;
2933 for (t = sleeping_queue; t != END_TSO_QUEUE;
2934 prev = t, t = t->link) {
2937 sleeping_queue = t->link;
2939 prev->link = t->link;
2944 barf("unblockThread (I/O): TSO not found");
2948 barf("unblockThread");
2952 tso->link = END_TSO_QUEUE;
2953 tso->why_blocked = NotBlocked;
2954 tso->block_info.closure = NULL;
2955 PUSH_ON_RUN_QUEUE(tso);
2956 RELEASE_LOCK(&sched_mutex);
2960 /* -----------------------------------------------------------------------------
2963 * The following function implements the magic for raising an
2964 * asynchronous exception in an existing thread.
2966 * We first remove the thread from any queue on which it might be
2967 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2969 * We strip the stack down to the innermost CATCH_FRAME, building
2970 * thunks in the heap for all the active computations, so they can
2971 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2972 * an application of the handler to the exception, and push it on
2973 * the top of the stack.
2975 * How exactly do we save all the active computations? We create an
2976 * AP_UPD for every UpdateFrame on the stack. Entering one of these
2977 * AP_UPDs pushes everything from the corresponding update frame
2978 * upwards onto the stack. (Actually, it pushes everything up to the
2979 * next update frame plus a pointer to the next AP_UPD object.
2980 * Entering the next AP_UPD object pushes more onto the stack until we
2981 * reach the last AP_UPD object - at which point the stack should look
2982 * exactly as it did when we killed the TSO and we can continue
2983 * execution by entering the closure on top of the stack.
2985 * We can also kill a thread entirely - this happens if either (a) the
2986 * exception passed to raiseAsync is NULL, or (b) there's no
2987 * CATCH_FRAME on the stack. In either case, we strip the entire
2988 * stack and replace the thread with a zombie.
2990 * -------------------------------------------------------------------------- */
2993 deleteThread(StgTSO *tso)
2995 raiseAsync(tso,NULL);
2999 raiseAsync(StgTSO *tso, StgClosure *exception)
3001 StgUpdateFrame* su = tso->su;
3002 StgPtr sp = tso->sp;
3004 /* Thread already dead? */
3005 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3009 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3011 /* Remove it from any blocking queues */
3014 /* The stack freezing code assumes there's a closure pointer on
3015 * the top of the stack. This isn't always the case with compiled
3016 * code, so we have to push a dummy closure on the top which just
3017 * returns to the next return address on the stack.
3019 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3020 *(--sp) = (W_)&stg_dummy_ret_closure;
3024 nat words = ((P_)su - (P_)sp) - 1;
3028 /* If we find a CATCH_FRAME, and we've got an exception to raise,
3029 * then build PAP(handler,exception,realworld#), and leave it on
3030 * top of the stack ready to enter.
3032 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3033 StgCatchFrame *cf = (StgCatchFrame *)su;
3034 /* we've got an exception to raise, so let's pass it to the
3035 * handler in this frame.
3037 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3038 TICK_ALLOC_UPD_PAP(3,0);
3039 SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3042 ap->fun = cf->handler; /* :: Exception -> IO a */
3043 ap->payload[0] = exception;
3044 ap->payload[1] = ARG_TAG(0); /* realworld token */
3046 /* throw away the stack from Sp up to and including the
3049 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
3052 /* Restore the blocked/unblocked state for asynchronous exceptions
3053 * at the CATCH_FRAME.
3055 * If exceptions were unblocked at the catch, arrange that they
3056 * are unblocked again after executing the handler by pushing an
3057 * unblockAsyncExceptions_ret stack frame.
3059 if (!cf->exceptions_blocked) {
3060 *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3063 /* Ensure that async exceptions are blocked when running the handler.
3065 if (tso->blocked_exceptions == NULL) {
3066 tso->blocked_exceptions = END_TSO_QUEUE;
3069 /* Put the newly-built PAP on top of the stack, ready to execute
3070 * when the thread restarts.
3074 tso->what_next = ThreadEnterGHC;
3075 IF_DEBUG(sanity, checkTSO(tso));
3079 /* First build an AP_UPD consisting of the stack chunk above the
3080 * current update frame, with the top word on the stack as the
3083 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3088 ap->fun = (StgClosure *)sp[0];
3090 for(i=0; i < (nat)words; ++i) {
3091 ap->payload[i] = (StgClosure *)*sp++;
3094 switch (get_itbl(su)->type) {
3098 SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */);
3099 TICK_ALLOC_UP_THK(words+1,0);
3102 fprintf(stderr, "scheduler: Updating ");
3103 printPtr((P_)su->updatee);
3104 fprintf(stderr, " with ");
3105 printObj((StgClosure *)ap);
3108 /* Replace the updatee with an indirection - happily
3109 * this will also wake up any threads currently
3110 * waiting on the result.
3112 * Warning: if we're in a loop, more than one update frame on
3113 * the stack may point to the same object. Be careful not to
3114 * overwrite an IND_OLDGEN in this case, because we'll screw
3115 * up the mutable lists. To be on the safe side, don't
3116 * overwrite any kind of indirection at all. See also
3117 * threadSqueezeStack in GC.c, where we have to make a similar
3120 if (!closure_IND(su->updatee)) {
3121 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
3124 sp += sizeofW(StgUpdateFrame) -1;
3125 sp[0] = (W_)ap; /* push onto stack */
3131 StgCatchFrame *cf = (StgCatchFrame *)su;
3134 /* We want a PAP, not an AP_UPD. Fortunately, the
3135 * layout's the same.
3137 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3138 TICK_ALLOC_UPD_PAP(words+1,0);
3140 /* now build o = FUN(catch,ap,handler) */
3141 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3142 TICK_ALLOC_FUN(2,0);
3143 SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3144 o->payload[0] = (StgClosure *)ap;
3145 o->payload[1] = cf->handler;
3148 fprintf(stderr, "scheduler: Built ");
3149 printObj((StgClosure *)o);
3152 /* pop the old handler and put o on the stack */
3154 sp += sizeofW(StgCatchFrame) - 1;
3161 StgSeqFrame *sf = (StgSeqFrame *)su;
3164 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3165 TICK_ALLOC_UPD_PAP(words+1,0);
3167 /* now build o = FUN(seq,ap) */
3168 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3169 TICK_ALLOC_SE_THK(1,0);
3170 SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3171 o->payload[0] = (StgClosure *)ap;
3174 fprintf(stderr, "scheduler: Built ");
3175 printObj((StgClosure *)o);
3178 /* pop the old handler and put o on the stack */
3180 sp += sizeofW(StgSeqFrame) - 1;
3186 /* We've stripped the entire stack, the thread is now dead. */
3187 sp += sizeofW(StgStopFrame) - 1;
3188 sp[0] = (W_)exception; /* save the exception */
3189 tso->what_next = ThreadKilled;
3190 tso->su = (StgUpdateFrame *)(sp+1);
3201 /* -----------------------------------------------------------------------------
3202 resurrectThreads is called after garbage collection on the list of
3203 threads found to be garbage. Each of these threads will be woken
3204 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3205 on an MVar, or NonTermination if the thread was blocked on a Black
3207 -------------------------------------------------------------------------- */
3210 resurrectThreads( StgTSO *threads )
3214 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3215 next = tso->global_link;
3216 tso->global_link = all_threads;
3218 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3220 switch (tso->why_blocked) {
3222 case BlockedOnException:
3223 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3225 case BlockedOnBlackHole:
3226 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3229 /* This might happen if the thread was blocked on a black hole
3230 * belonging to a thread that we've just woken up (raiseAsync
3231 * can wake up threads, remember...).
3235 barf("resurrectThreads: thread blocked in a strange way");
3240 /* -----------------------------------------------------------------------------
3241 * Blackhole detection: if we reach a deadlock, test whether any
3242 * threads are blocked on themselves. Any threads which are found to
3243 * be self-blocked get sent a NonTermination exception.
3245 * This is only done in a deadlock situation in order to avoid
3246 * performance overhead in the normal case.
3247 * -------------------------------------------------------------------------- */
3250 detectBlackHoles( void )
3252 StgTSO *t = all_threads;
3253 StgUpdateFrame *frame;
3254 StgClosure *blocked_on;
3256 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3258 while (t->what_next == ThreadRelocated) {
3260 ASSERT(get_itbl(t)->type == TSO);
3263 if (t->why_blocked != BlockedOnBlackHole) {
3267 blocked_on = t->block_info.closure;
3269 for (frame = t->su; ; frame = frame->link) {
3270 switch (get_itbl(frame)->type) {
3273 if (frame->updatee == blocked_on) {
3274 /* We are blocking on one of our own computations, so
3275 * send this thread the NonTermination exception.
3278 sched_belch("thread %d is blocked on itself", t->id));
3279 raiseAsync(t, (StgClosure *)NonTermination_closure);
3300 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3301 //@subsection Debugging Routines
3303 /* -----------------------------------------------------------------------------
3304 Debugging: why is a thread blocked
3305 -------------------------------------------------------------------------- */
3310 printThreadBlockage(StgTSO *tso)
3312 switch (tso->why_blocked) {
3314 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3316 case BlockedOnWrite:
3317 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3319 case BlockedOnDelay:
3320 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3323 fprintf(stderr,"is blocked on an MVar");
3325 case BlockedOnException:
3326 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3327 tso->block_info.tso->id);
3329 case BlockedOnBlackHole:
3330 fprintf(stderr,"is blocked on a black hole");
3333 fprintf(stderr,"is not blocked");
3337 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3338 tso->block_info.closure, info_type(tso->block_info.closure));
3340 case BlockedOnGA_NoSend:
3341 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3342 tso->block_info.closure, info_type(tso->block_info.closure));
3346 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3347 tso->why_blocked, tso->id, tso);
3352 printThreadStatus(StgTSO *tso)
3354 switch (tso->what_next) {
3356 fprintf(stderr,"has been killed");
3358 case ThreadComplete:
3359 fprintf(stderr,"has completed");
3362 printThreadBlockage(tso);
3367 printAllThreads(void)
3372 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3373 ullong_format_string(TIME_ON_PROC(CurrentProc),
3374 time_string, rtsFalse/*no commas!*/);
3376 sched_belch("all threads at [%s]:", time_string);
3378 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3379 ullong_format_string(CURRENT_TIME,
3380 time_string, rtsFalse/*no commas!*/);
3382 sched_belch("all threads at [%s]:", time_string);
3384 sched_belch("all threads:");
3387 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3388 fprintf(stderr, "\tthread %d ", t->id);
3389 printThreadStatus(t);
3390 fprintf(stderr,"\n");
3395 Print a whole blocking queue attached to node (debugging only).
3400 print_bq (StgClosure *node)
3402 StgBlockingQueueElement *bqe;
3406 fprintf(stderr,"## BQ of closure %p (%s): ",
3407 node, info_type(node));
3409 /* should cover all closures that may have a blocking queue */
3410 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3411 get_itbl(node)->type == FETCH_ME_BQ ||
3412 get_itbl(node)->type == RBH ||
3413 get_itbl(node)->type == MVAR);
3415 ASSERT(node!=(StgClosure*)NULL); // sanity check
3417 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3421 Print a whole blocking queue starting with the element bqe.
3424 print_bqe (StgBlockingQueueElement *bqe)
3429 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3431 for (end = (bqe==END_BQ_QUEUE);
3432 !end; // iterate until bqe points to a CONSTR
3433 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3434 bqe = end ? END_BQ_QUEUE : bqe->link) {
3435 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3436 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3437 /* types of closures that may appear in a blocking queue */
3438 ASSERT(get_itbl(bqe)->type == TSO ||
3439 get_itbl(bqe)->type == BLOCKED_FETCH ||
3440 get_itbl(bqe)->type == CONSTR);
3441 /* only BQs of an RBH end with an RBH_Save closure */
3442 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3444 switch (get_itbl(bqe)->type) {
3446 fprintf(stderr," TSO %u (%x),",
3447 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3450 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3451 ((StgBlockedFetch *)bqe)->node,
3452 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3453 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3454 ((StgBlockedFetch *)bqe)->ga.weight);
3457 fprintf(stderr," %s (IP %p),",
3458 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3459 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3460 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3461 "RBH_Save_?"), get_itbl(bqe));
3464 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3465 info_type((StgClosure *)bqe)); // , node, info_type(node));
3469 fputc('\n', stderr);
3471 # elif defined(GRAN)
3473 print_bq (StgClosure *node)
3475 StgBlockingQueueElement *bqe;
3476 PEs node_loc, tso_loc;
3479 /* should cover all closures that may have a blocking queue */
3480 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3481 get_itbl(node)->type == FETCH_ME_BQ ||
3482 get_itbl(node)->type == RBH);
3484 ASSERT(node!=(StgClosure*)NULL); // sanity check
3485 node_loc = where_is(node);
3487 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3488 node, info_type(node), node_loc);
3491 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3493 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3494 !end; // iterate until bqe points to a CONSTR
3495 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3496 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3497 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3498 /* types of closures that may appear in a blocking queue */
3499 ASSERT(get_itbl(bqe)->type == TSO ||
3500 get_itbl(bqe)->type == CONSTR);
3501 /* only BQs of an RBH end with an RBH_Save closure */
3502 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3504 tso_loc = where_is((StgClosure *)bqe);
3505 switch (get_itbl(bqe)->type) {
3507 fprintf(stderr," TSO %d (%p) on [PE %d],",
3508 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3511 fprintf(stderr," %s (IP %p),",
3512 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3513 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3514 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3515 "RBH_Save_?"), get_itbl(bqe));
3518 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3519 info_type((StgClosure *)bqe), node, info_type(node));
3523 fputc('\n', stderr);
3527 Nice and easy: only TSOs on the blocking queue
3530 print_bq (StgClosure *node)
3534 ASSERT(node!=(StgClosure*)NULL); // sanity check
3535 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3536 tso != END_TSO_QUEUE;
3538 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3539 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3540 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3542 fputc('\n', stderr);
3553 for (i=0, tso=run_queue_hd;
3554 tso != END_TSO_QUEUE;
3563 sched_belch(char *s, ...)
3568 fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3570 fprintf(stderr, "== ");
3572 fprintf(stderr, "scheduler: ");
3574 vfprintf(stderr, s, ap);
3575 fprintf(stderr, "\n");
3581 //@node Index, , Debugging Routines, Main scheduling code
3585 //* MainRegTable:: @cindex\s-+MainRegTable
3586 //* StgMainThread:: @cindex\s-+StgMainThread
3587 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3588 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3589 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3590 //* context_switch:: @cindex\s-+context_switch
3591 //* createThread:: @cindex\s-+createThread
3592 //* free_capabilities:: @cindex\s-+free_capabilities
3593 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3594 //* initScheduler:: @cindex\s-+initScheduler
3595 //* interrupted:: @cindex\s-+interrupted
3596 //* n_free_capabilities:: @cindex\s-+n_free_capabilities
3597 //* next_thread_id:: @cindex\s-+next_thread_id
3598 //* print_bq:: @cindex\s-+print_bq
3599 //* run_queue_hd:: @cindex\s-+run_queue_hd
3600 //* run_queue_tl:: @cindex\s-+run_queue_tl
3601 //* sched_mutex:: @cindex\s-+sched_mutex
3602 //* schedule:: @cindex\s-+schedule
3603 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3604 //* task_ids:: @cindex\s-+task_ids
3605 //* term_mutex:: @cindex\s-+term_mutex
3606 //* thread_ready_cond:: @cindex\s-+thread_ready_cond