1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.109 2001/12/07 20:57:53 sof Exp $
4 * (c) The GHC Team, 1998-2000
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distributed memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
17 * --------------------------------------------------------------------------*/
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
23 * Version with scheduler monitor support for SMPs (WAY=s):
25 This design provides a high-level API to create and schedule threads etc.
26 as documented in the SMP design document.
28 It uses a monitor design controlled by a single mutex to exercise control
29 over accesses to shared data structures, and builds on the Posix threads
32 The majority of state is shared. In order to keep essential per-task state,
33 there is a Capability structure, which contains all the information
34 needed to run a thread: its STG registers, a pointer to its TSO, a
35 nursery etc. During STG execution, a pointer to the capability is
36 kept in a register (BaseReg).
38 In a non-SMP build, there is one global capability, namely MainRegTable.
42 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44 The main scheduling loop in GUM iterates until a finish message is received.
45 In that case a global flag @receivedFinish@ is set and this instance of
46 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47 for the handling of incoming messages, such as PP_FINISH.
48 Note that in the parallel case we have a system manager that coordinates
49 different PEs, each of which are running one instance of the RTS.
50 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55 The main scheduling code in GranSim is quite different from that in std
56 (concurrent) Haskell: while concurrent Haskell just iterates over the
57 threads in the runnable queue, GranSim is event driven, i.e. it iterates
58 over the events in the global event queue. -- HWL
63 //* Variables and Data structures::
64 //* Main scheduling loop::
65 //* Suspend and Resume::
67 //* Garbage Collextion Routines::
68 //* Blocking Queue Routines::
69 //* Exception Handling Routines::
70 //* Debugging Routines::
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
77 #include "PosixSource.h"
84 #include "StgStartup.h"
87 #include "StgMiscClosures.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
99 #include "Proftimer.h"
100 #include "ProfHeap.h"
101 #include "RetainerProfile.h"
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
116 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
117 //@subsection Variables and Data structures
121 * These are the threads which clients have requested that we run.
123 * In an SMP build, we might have several concurrent clients all
124 * waiting for results, and each one will wait on a condition variable
125 * until the result is available.
127 * In non-SMP, clients are strictly nested: the first client calls
128 * into the RTS, which might call out again to C with a _ccall_GC, and
129 * eventually re-enter the RTS.
131 * Main threads information is kept in a linked list:
133 //@cindex StgMainThread
134 typedef struct StgMainThread_ {
136 SchedulerStatus stat;
139 pthread_cond_t wakeup;
141 struct StgMainThread_ *link;
144 /* Main thread queue.
145 * Locks required: sched_mutex.
147 static StgMainThread *main_threads;
150 * Locks required: sched_mutex.
154 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
155 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
158 In GranSim we have a runable and a blocked queue for each processor.
159 In order to minimise code changes new arrays run_queue_hds/tls
160 are created. run_queue_hd is then a short cut (macro) for
161 run_queue_hds[CurrentProc] (see GranSim.h).
164 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
165 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
166 StgTSO *ccalling_threadss[MAX_PROC];
167 /* We use the same global list of threads (all_threads) in GranSim as in
168 the std RTS (i.e. we are cheating). However, we don't use this list in
169 the GranSim specific code at the moment (so we are only potentially
174 StgTSO *run_queue_hd, *run_queue_tl;
175 StgTSO *blocked_queue_hd, *blocked_queue_tl;
176 StgTSO *sleeping_queue; /* perhaps replace with a hash table? */
180 /* Linked list of all threads.
181 * Used for detecting garbage collected threads.
185 /* Threads suspended in _ccall_GC.
187 static StgTSO *suspended_ccalling_threads;
189 static StgTSO *threadStackOverflow(StgTSO *tso);
191 /* KH: The following two flags are shared memory locations. There is no need
192 to lock them, since they are only unset at the end of a scheduler
196 /* flag set by signal handler to precipitate a context switch */
197 //@cindex context_switch
200 /* if this flag is set as well, give up execution */
201 //@cindex interrupted
204 /* Next thread ID to allocate.
205 * Locks required: sched_mutex
207 //@cindex next_thread_id
208 StgThreadID next_thread_id = 1;
211 * Pointers to the state of the current thread.
212 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
213 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
216 /* The smallest stack size that makes any sense is:
217 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
218 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
219 * + 1 (the realworld token for an IO thread)
220 * + 1 (the closure to enter)
222 * A thread with this stack will bomb immediately with a stack
223 * overflow, which will increase its stack size.
226 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
228 /* Free capability list.
229 * Locks required: sched_mutex.
232 Capability *free_capabilities; /* Available capabilities for running threads */
233 nat n_free_capabilities; /* total number of available capabilities */
235 Capability MainCapability; /* for non-SMP, we have one global capability */
242 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
243 * exists - earlier gccs apparently didn't.
250 /* All our current task ids, saved in case we need to kill them later.
257 void addToBlockedQueue ( StgTSO *tso );
259 static void schedule ( void );
260 void interruptStgRts ( void );
262 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
264 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
267 static void detectBlackHoles ( void );
270 static void sched_belch(char *s, ...);
274 //@cindex sched_mutex
276 //@cindex thread_ready_cond
277 //@cindex gc_pending_cond
278 pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
279 pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
280 pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
281 pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
288 rtsTime TimeOfLastYield;
289 rtsBool emitSchedule = rtsTrue;
293 char *whatNext_strs[] = {
301 char *threadReturnCode_strs[] = {
302 "HeapOverflow", /* might also be StackOverflow */
311 StgTSO * createSparkThread(rtsSpark spark);
312 StgTSO * activateSpark (rtsSpark spark);
316 * The thread state for the main thread.
317 // ToDo: check whether not needed any more
321 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
322 //@subsection Main scheduling loop
324 /* ---------------------------------------------------------------------------
325 Main scheduling loop.
327 We use round-robin scheduling, each thread returning to the
328 scheduler loop when one of these conditions is detected:
331 * timer expires (thread yields)
336 Locking notes: we acquire the scheduler lock once at the beginning
337 of the scheduler loop, and release it when
339 * running a thread, or
340 * waiting for work, or
341 * waiting for a GC to complete.
344 In a GranSim setup this loop iterates over the global event queue.
345 This revolves around the global event queue, which determines what
346 to do next. Therefore, it's more complicated than either the
347 concurrent or the parallel (GUM) setup.
350 GUM iterates over incoming messages.
351 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
352 and sends out a fish whenever it has nothing to do; in-between
353 doing the actual reductions (shared code below) it processes the
354 incoming messages and deals with delayed operations
355 (see PendingFetches).
356 This is not the ugliest code you could imagine, but it's bloody close.
358 ------------------------------------------------------------------------ */
365 StgThreadReturnCode ret;
373 rtsBool receivedFinish = rtsFalse;
375 nat tp_size, sp_size; // stats only
378 rtsBool was_interrupted = rtsFalse;
380 ACQUIRE_LOCK(&sched_mutex);
384 /* set up first event to get things going */
385 /* ToDo: assign costs for system setup and init MainTSO ! */
386 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
388 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
391 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
392 G_TSO(CurrentTSO, 5));
394 if (RtsFlags.GranFlags.Light) {
395 /* Save current time; GranSim Light only */
396 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
399 event = get_next_event();
401 while (event!=(rtsEvent*)NULL) {
402 /* Choose the processor with the next event */
403 CurrentProc = event->proc;
404 CurrentTSO = event->tso;
408 while (!receivedFinish) { /* set by processMessages */
409 /* when receiving PP_FINISH message */
416 IF_DEBUG(scheduler, printAllThreads());
418 /* If we're interrupted (the user pressed ^C, or some other
419 * termination condition occurred), kill all the currently running
423 IF_DEBUG(scheduler, sched_belch("interrupted"));
425 interrupted = rtsFalse;
426 was_interrupted = rtsTrue;
429 /* Go through the list of main threads and wake up any
430 * clients whose computations have finished. ToDo: this
431 * should be done more efficiently without a linear scan
432 * of the main threads list, somehow...
436 StgMainThread *m, **prev;
437 prev = &main_threads;
438 for (m = main_threads; m != NULL; m = m->link) {
439 switch (m->tso->what_next) {
442 *(m->ret) = (StgClosure *)m->tso->sp[0];
446 pthread_cond_broadcast(&m->wakeup);
449 if (m->ret) *(m->ret) = NULL;
451 if (was_interrupted) {
452 m->stat = Interrupted;
456 pthread_cond_broadcast(&m->wakeup);
467 /* in GUM do this only on the Main PE */
470 /* If our main thread has finished or been killed, return.
473 StgMainThread *m = main_threads;
474 if (m->tso->what_next == ThreadComplete
475 || m->tso->what_next == ThreadKilled) {
476 main_threads = main_threads->link;
477 if (m->tso->what_next == ThreadComplete) {
478 /* we finished successfully, fill in the return value */
479 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
483 if (m->ret) { *(m->ret) = NULL; };
484 if (was_interrupted) {
485 m->stat = Interrupted;
495 /* Top up the run queue from our spark pool. We try to make the
496 * number of threads in the run queue equal to the number of
501 nat n = n_free_capabilities;
502 StgTSO *tso = run_queue_hd;
504 /* Count the run queue */
505 while (n > 0 && tso != END_TSO_QUEUE) {
512 spark = findSpark(rtsFalse);
514 break; /* no more sparks in the pool */
516 /* I'd prefer this to be done in activateSpark -- HWL */
517 /* tricky - it needs to hold the scheduler lock and
518 * not try to re-acquire it -- SDM */
519 createSparkThread(spark);
521 sched_belch("==^^ turning spark of closure %p into a thread",
522 (StgClosure *)spark));
525 /* We need to wake up the other tasks if we just created some
528 if (n_free_capabilities - n > 1) {
529 pthread_cond_signal(&thread_ready_cond);
534 /* check for signals each time around the scheduler */
535 #ifndef mingw32_TARGET_OS
536 if (signals_pending()) {
537 startSignalHandlers();
541 /* Check whether any waiting threads need to be woken up. If the
542 * run queue is empty, and there are no other tasks running, we
543 * can wait indefinitely for something to happen.
544 * ToDo: what if another client comes along & requests another
547 if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
549 (run_queue_hd == END_TSO_QUEUE)
551 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
555 /* we can be interrupted while waiting for I/O... */
556 if (interrupted) continue;
559 * Detect deadlock: when we have no threads to run, there are no
560 * threads waiting on I/O or sleeping, and all the other tasks are
561 * waiting for work, we must have a deadlock of some description.
563 * We first try to find threads blocked on themselves (ie. black
564 * holes), and generate NonTermination exceptions where necessary.
566 * If no threads are black holed, we have a deadlock situation, so
567 * inform all the main threads.
570 if (blocked_queue_hd == END_TSO_QUEUE
571 && run_queue_hd == END_TSO_QUEUE
572 && sleeping_queue == END_TSO_QUEUE
574 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
578 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
579 GarbageCollect(GetRoots,rtsTrue);
580 if (blocked_queue_hd == END_TSO_QUEUE
581 && run_queue_hd == END_TSO_QUEUE
582 && sleeping_queue == END_TSO_QUEUE) {
583 IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
585 if (run_queue_hd == END_TSO_QUEUE) {
586 StgMainThread *m = main_threads;
588 for (; m != NULL; m = m->link) {
589 deleteThread(m->tso);
592 pthread_cond_broadcast(&m->wakeup);
596 deleteThread(m->tso);
599 main_threads = m->link;
606 /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
610 /* If there's a GC pending, don't do anything until it has
614 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
615 pthread_cond_wait(&gc_pending_cond, &sched_mutex);
618 /* block until we've got a thread on the run queue and a free
621 while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
622 IF_DEBUG(scheduler, sched_belch("waiting for work"));
623 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
624 IF_DEBUG(scheduler, sched_belch("work now available"));
630 if (RtsFlags.GranFlags.Light)
631 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
633 /* adjust time based on time-stamp */
634 if (event->time > CurrentTime[CurrentProc] &&
635 event->evttype != ContinueThread)
636 CurrentTime[CurrentProc] = event->time;
638 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
639 if (!RtsFlags.GranFlags.Light)
642 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
644 /* main event dispatcher in GranSim */
645 switch (event->evttype) {
646 /* Should just be continuing execution */
648 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
649 /* ToDo: check assertion
650 ASSERT(run_queue_hd != (StgTSO*)NULL &&
651 run_queue_hd != END_TSO_QUEUE);
653 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
654 if (!RtsFlags.GranFlags.DoAsyncFetch &&
655 procStatus[CurrentProc]==Fetching) {
656 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
657 CurrentTSO->id, CurrentTSO, CurrentProc);
660 /* Ignore ContinueThreads for completed threads */
661 if (CurrentTSO->what_next == ThreadComplete) {
662 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
663 CurrentTSO->id, CurrentTSO, CurrentProc);
666 /* Ignore ContinueThreads for threads that are being migrated */
667 if (PROCS(CurrentTSO)==Nowhere) {
668 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
669 CurrentTSO->id, CurrentTSO, CurrentProc);
672 /* The thread should be at the beginning of the run queue */
673 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
674 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
675 CurrentTSO->id, CurrentTSO, CurrentProc);
676 break; // run the thread anyway
679 new_event(proc, proc, CurrentTime[proc],
681 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
683 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
684 break; // now actually run the thread; DaH Qu'vam yImuHbej
687 do_the_fetchnode(event);
688 goto next_thread; /* handle next event in event queue */
691 do_the_globalblock(event);
692 goto next_thread; /* handle next event in event queue */
695 do_the_fetchreply(event);
696 goto next_thread; /* handle next event in event queue */
698 case UnblockThread: /* Move from the blocked queue to the tail of */
699 do_the_unblock(event);
700 goto next_thread; /* handle next event in event queue */
702 case ResumeThread: /* Move from the blocked queue to the tail of */
703 /* the runnable queue ( i.e. Qu' SImqa'lu') */
704 event->tso->gran.blocktime +=
705 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
706 do_the_startthread(event);
707 goto next_thread; /* handle next event in event queue */
710 do_the_startthread(event);
711 goto next_thread; /* handle next event in event queue */
714 do_the_movethread(event);
715 goto next_thread; /* handle next event in event queue */
718 do_the_movespark(event);
719 goto next_thread; /* handle next event in event queue */
722 do_the_findwork(event);
723 goto next_thread; /* handle next event in event queue */
726 barf("Illegal event type %u\n", event->evttype);
729 /* This point was scheduler_loop in the old RTS */
731 IF_DEBUG(gran, belch("GRAN: after main switch"));
733 TimeOfLastEvent = CurrentTime[CurrentProc];
734 TimeOfNextEvent = get_time_of_next_event();
735 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
736 // CurrentTSO = ThreadQueueHd;
738 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
741 if (RtsFlags.GranFlags.Light)
742 GranSimLight_leave_system(event, &ActiveTSO);
744 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
747 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
749 /* in a GranSim setup the TSO stays on the run queue */
751 /* Take a thread from the run queue. */
752 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
755 fprintf(stderr, "GRAN: About to run current thread, which is\n");
758 context_switch = 0; // turned on via GranYield, checking events and time slice
761 DumpGranEvent(GR_SCHEDULE, t));
763 procStatus[CurrentProc] = Busy;
766 if (PendingFetches != END_BF_QUEUE) {
770 /* ToDo: phps merge with spark activation above */
771 /* check whether we have local work and send requests if we have none */
772 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
773 /* :-[ no local threads => look out for local sparks */
774 /* the spark pool for the current PE */
775 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
776 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
777 pool->hd < pool->tl) {
779 * ToDo: add GC code check that we really have enough heap afterwards!!
781 * If we're here (no runnable threads) and we have pending
782 * sparks, we must have a space problem. Get enough space
783 * to turn one of those pending sparks into a
787 spark = findSpark(rtsFalse); /* get a spark */
788 if (spark != (rtsSpark) NULL) {
789 tso = activateSpark(spark); /* turn the spark into a thread */
790 IF_PAR_DEBUG(schedule,
791 belch("==== schedule: Created TSO %d (%p); %d threads active",
792 tso->id, tso, advisory_thread_count));
794 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
795 belch("==^^ failed to activate spark");
797 } /* otherwise fall through & pick-up new tso */
799 IF_PAR_DEBUG(verbose,
800 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
801 spark_queue_len(pool)));
806 /* If we still have no work we need to send a FISH to get a spark
809 if (EMPTY_RUN_QUEUE()) {
810 /* =8-[ no local sparks => look for work on other PEs */
812 * We really have absolutely no work. Send out a fish
813 * (there may be some out there already), and wait for
814 * something to arrive. We clearly can't run any threads
815 * until a SCHEDULE or RESUME arrives, and so that's what
816 * we're hoping to see. (Of course, we still have to
817 * respond to other types of messages.)
819 TIME now = msTime() /*CURRENT_TIME*/;
820 IF_PAR_DEBUG(verbose,
821 belch("-- now=%ld", now));
822 IF_PAR_DEBUG(verbose,
823 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
824 (last_fish_arrived_at!=0 &&
825 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
826 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
827 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
828 last_fish_arrived_at,
829 RtsFlags.ParFlags.fishDelay, now);
832 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
833 (last_fish_arrived_at==0 ||
834 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
835 /* outstandingFishes is set in sendFish, processFish;
836 avoid flooding system with fishes via delay */
838 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
841 // Global statistics: count no. of fishes
842 if (RtsFlags.ParFlags.ParStats.Global &&
843 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
844 globalParStats.tot_fish_mess++;
848 receivedFinish = processMessages();
851 } else if (PacketsWaiting()) { /* Look for incoming messages */
852 receivedFinish = processMessages();
855 /* Now we are sure that we have some work available */
856 ASSERT(run_queue_hd != END_TSO_QUEUE);
858 /* Take a thread from the run queue, if we have work */
859 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
860 IF_DEBUG(sanity,checkTSO(t));
862 /* ToDo: write something to the log-file
863 if (RTSflags.ParFlags.granSimStats && !sameThread)
864 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
868 /* the spark pool for the current PE */
869 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
872 belch("--=^ %d threads, %d sparks on [%#x]",
873 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
876 if (0 && RtsFlags.ParFlags.ParStats.Full &&
877 t && LastTSO && t->id != LastTSO->id &&
878 LastTSO->why_blocked == NotBlocked &&
879 LastTSO->what_next != ThreadComplete) {
880 // if previously scheduled TSO not blocked we have to record the context switch
881 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
882 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
885 if (RtsFlags.ParFlags.ParStats.Full &&
886 (emitSchedule /* forced emit */ ||
887 (t && LastTSO && t->id != LastTSO->id))) {
889 we are running a different TSO, so write a schedule event to log file
890 NB: If we use fair scheduling we also have to write a deschedule
891 event for LastTSO; with unfair scheduling we know that the
892 previous tso has blocked whenever we switch to another tso, so
893 we don't need it in GUM for now
895 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
896 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
897 emitSchedule = rtsFalse;
901 #else /* !GRAN && !PAR */
903 /* grab a thread from the run queue
905 ASSERT(run_queue_hd != END_TSO_QUEUE);
908 // Sanity check the thread we're about to run. This can be
909 // expensive if there is lots of thread switching going on...
910 IF_DEBUG(sanity,checkTSO(t));
917 cap = free_capabilities;
918 free_capabilities = cap->link;
919 n_free_capabilities--;
921 cap = &MainCapability;
924 cap->r.rCurrentTSO = t;
926 /* context switches are now initiated by the timer signal, unless
927 * the user specified "context switch as often as possible", with
932 RtsFlags.ProfFlags.profileInterval == 0 ||
934 (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
935 && (run_queue_hd != END_TSO_QUEUE
936 || blocked_queue_hd != END_TSO_QUEUE
937 || sleeping_queue != END_TSO_QUEUE)))
942 RELEASE_LOCK(&sched_mutex);
944 IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
945 t->id, t, whatNext_strs[t->what_next]));
948 startHeapProfTimer();
951 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
952 /* Run the current thread
954 switch (cap->r.rCurrentTSO->what_next) {
957 /* Thread already finished, return to scheduler. */
958 ret = ThreadFinished;
961 ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
964 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
966 case ThreadEnterInterp:
967 ret = interpretBCO(cap);
970 barf("schedule: invalid what_next field");
972 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
974 /* Costs for the scheduler are assigned to CCS_SYSTEM */
980 ACQUIRE_LOCK(&sched_mutex);
983 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
984 #elif !defined(GRAN) && !defined(PAR)
985 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
987 t = cap->r.rCurrentTSO;
990 /* HACK 675: if the last thread didn't yield, make sure to print a
991 SCHEDULE event to the log file when StgRunning the next thread, even
992 if it is the same one as before */
994 TimeOfLastYield = CURRENT_TIME;
1000 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1001 globalGranStats.tot_heapover++;
1003 globalParStats.tot_heapover++;
1006 // did the task ask for a large block?
1007 if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1008 // if so, get one and push it on the front of the nursery.
1012 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1014 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)",
1016 whatNext_strs[t->what_next], blocks));
1018 // don't do this if it would push us over the
1019 // alloc_blocks_lim limit; we'll GC first.
1020 if (alloc_blocks + blocks < alloc_blocks_lim) {
1022 alloc_blocks += blocks;
1023 bd = allocGroup( blocks );
1025 // link the new group into the list
1026 bd->link = cap->r.rCurrentNursery;
1027 bd->u.back = cap->r.rCurrentNursery->u.back;
1028 if (cap->r.rCurrentNursery->u.back != NULL) {
1029 cap->r.rCurrentNursery->u.back->link = bd;
1031 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1032 g0s0->blocks == cap->r.rNursery);
1033 cap->r.rNursery = g0s0->blocks = bd;
1035 cap->r.rCurrentNursery->u.back = bd;
1037 // initialise it as a nursery block
1041 bd->free = bd->start;
1043 // don't forget to update the block count in g0s0.
1044 g0s0->n_blocks += blocks;
1045 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1047 // now update the nursery to point to the new block
1048 cap->r.rCurrentNursery = bd;
1050 // we might be unlucky and have another thread get on the
1051 // run queue before us and steal the large block, but in that
1052 // case the thread will just end up requesting another large
1054 PUSH_ON_RUN_QUEUE(t);
1059 /* make all the running tasks block on a condition variable,
1060 * maybe set context_switch and wait till they all pile in,
1061 * then have them wait on a GC condition variable.
1063 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
1064 t->id, t, whatNext_strs[t->what_next]));
1067 ASSERT(!is_on_queue(t,CurrentProc));
1069 /* Currently we emit a DESCHEDULE event before GC in GUM.
1070 ToDo: either add separate event to distinguish SYSTEM time from rest
1071 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1072 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1073 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1074 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1075 emitSchedule = rtsTrue;
1079 ready_to_gc = rtsTrue;
1080 context_switch = 1; /* stop other threads ASAP */
1081 PUSH_ON_RUN_QUEUE(t);
1082 /* actual GC is done at the end of the while loop */
1088 DumpGranEvent(GR_DESCHEDULE, t));
1089 globalGranStats.tot_stackover++;
1092 // DumpGranEvent(GR_DESCHEDULE, t);
1093 globalParStats.tot_stackover++;
1095 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
1096 t->id, t, whatNext_strs[t->what_next]));
1097 /* just adjust the stack for this thread, then pop it back
1103 /* enlarge the stack */
1104 StgTSO *new_t = threadStackOverflow(t);
1106 /* This TSO has moved, so update any pointers to it from the
1107 * main thread stack. It better not be on any other queues...
1108 * (it shouldn't be).
1110 for (m = main_threads; m != NULL; m = m->link) {
1115 threadPaused(new_t);
1116 PUSH_ON_RUN_QUEUE(new_t);
1120 case ThreadYielding:
1123 DumpGranEvent(GR_DESCHEDULE, t));
1124 globalGranStats.tot_yields++;
1127 // DumpGranEvent(GR_DESCHEDULE, t);
1128 globalParStats.tot_yields++;
1130 /* put the thread back on the run queue. Then, if we're ready to
1131 * GC, check whether this is the last task to stop. If so, wake
1132 * up the GC thread. getThread will block during a GC until the
1136 if (t->what_next == ThreadEnterInterp) {
1137 /* ToDo: or maybe a timer expired when we were in Hugs?
1138 * or maybe someone hit ctrl-C
1140 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
1141 t->id, t, whatNext_strs[t->what_next]);
1143 belch("--<< thread %ld (%p; %s) stopped, yielding",
1144 t->id, t, whatNext_strs[t->what_next]);
1151 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1153 ASSERT(t->link == END_TSO_QUEUE);
1155 ASSERT(!is_on_queue(t,CurrentProc));
1158 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1159 checkThreadQsSanity(rtsTrue));
1162 if (RtsFlags.ParFlags.doFairScheduling) {
1163 /* this does round-robin scheduling; good for concurrency */
1164 APPEND_TO_RUN_QUEUE(t);
1166 /* this does unfair scheduling; good for parallelism */
1167 PUSH_ON_RUN_QUEUE(t);
1170 /* this does round-robin scheduling; good for concurrency */
1171 APPEND_TO_RUN_QUEUE(t);
1174 /* add a ContinueThread event to actually process the thread */
1175 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1177 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1179 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1188 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1189 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1190 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1192 // ??? needed; should emit block before
1194 DumpGranEvent(GR_DESCHEDULE, t));
1195 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1198 ASSERT(procStatus[CurrentProc]==Busy ||
1199 ((procStatus[CurrentProc]==Fetching) &&
1200 (t->block_info.closure!=(StgClosure*)NULL)));
1201 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1202 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1203 procStatus[CurrentProc]==Fetching))
1204 procStatus[CurrentProc] = Idle;
1208 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1209 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1212 if (t->block_info.closure!=(StgClosure*)NULL)
1213 print_bq(t->block_info.closure));
1215 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1218 /* whatever we schedule next, we must log that schedule */
1219 emitSchedule = rtsTrue;
1222 /* don't need to do anything. Either the thread is blocked on
1223 * I/O, in which case we'll have called addToBlockedQueue
1224 * previously, or it's blocked on an MVar or Blackhole, in which
1225 * case it'll be on the relevant queue already.
1228 fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1229 printThreadBlockage(t);
1230 fprintf(stderr, "\n"));
1232 /* Only for dumping event to log file
1233 ToDo: do I need this in GranSim, too?
1240 case ThreadFinished:
1241 /* Need to check whether this was a main thread, and if so, signal
1242 * the task that started it with the return value. If we have no
1243 * more main threads, we probably need to stop all the tasks until
1246 /* We also end up here if the thread kills itself with an
1247 * uncaught exception, see Exception.hc.
1249 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1251 endThread(t, CurrentProc); // clean-up the thread
1253 /* For now all are advisory -- HWL */
1254 //if(t->priority==AdvisoryPriority) ??
1255 advisory_thread_count--;
1258 if(t->dist.priority==RevalPriority)
1262 if (RtsFlags.ParFlags.ParStats.Full &&
1263 !RtsFlags.ParFlags.ParStats.Suppressed)
1264 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1269 barf("schedule: invalid thread return code %d", (int)ret);
1273 cap->link = free_capabilities;
1274 free_capabilities = cap;
1275 n_free_capabilities++;
1279 if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1280 GarbageCollect(GetRoots, rtsTrue);
1282 performHeapProfile = rtsFalse;
1283 ready_to_gc = rtsFalse; // we already GC'd
1288 if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
1293 /* everybody back, start the GC.
1294 * Could do it in this thread, or signal a condition var
1295 * to do it in another thread. Either way, we need to
1296 * broadcast on gc_pending_cond afterward.
1299 IF_DEBUG(scheduler,sched_belch("doing GC"));
1301 GarbageCollect(GetRoots,rtsFalse);
1302 ready_to_gc = rtsFalse;
1304 pthread_cond_broadcast(&gc_pending_cond);
1307 /* add a ContinueThread event to continue execution of current thread */
1308 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1310 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1312 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1320 IF_GRAN_DEBUG(unused,
1321 print_eventq(EventHd));
1323 event = get_next_event();
1326 /* ToDo: wait for next message to arrive rather than busy wait */
1329 } /* end of while(1) */
1331 IF_PAR_DEBUG(verbose,
1332 belch("== Leaving schedule() after having received Finish"));
1335 /* ---------------------------------------------------------------------------
1336 * deleteAllThreads(): kill all the live threads.
1338 * This is used when we catch a user interrupt (^C), before performing
1339 * any necessary cleanups and running finalizers.
1340 * ------------------------------------------------------------------------- */
1342 void deleteAllThreads ( void )
1345 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1346 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1349 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1352 for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1355 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1356 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1357 sleeping_queue = END_TSO_QUEUE;
1360 /* startThread and insertThread are now in GranSim.c -- HWL */
1362 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1363 //@subsection Suspend and Resume
1365 /* ---------------------------------------------------------------------------
1366 * Suspending & resuming Haskell threads.
1368 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1369 * its capability before calling the C function. This allows another
1370 * task to pick up the capability and carry on running Haskell
1371 * threads. It also means that if the C call blocks, it won't lock
1374 * The Haskell thread making the C call is put to sleep for the
1375 * duration of the call, on the susepended_ccalling_threads queue. We
1376 * give out a token to the task, which it can use to resume the thread
1377 * on return from the C function.
1378 * ------------------------------------------------------------------------- */
1381 suspendThread( StgRegTable *reg )
1386 // assume that *reg is a pointer to the StgRegTable part of a Capability
1387 cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1389 ACQUIRE_LOCK(&sched_mutex);
1392 sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1394 threadPaused(cap->r.rCurrentTSO);
1395 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1396 suspended_ccalling_threads = cap->r.rCurrentTSO;
1398 /* Use the thread ID as the token; it should be unique */
1399 tok = cap->r.rCurrentTSO->id;
1402 cap->link = free_capabilities;
1403 free_capabilities = cap;
1404 n_free_capabilities++;
1407 RELEASE_LOCK(&sched_mutex);
1412 resumeThread( StgInt tok )
1414 StgTSO *tso, **prev;
1417 ACQUIRE_LOCK(&sched_mutex);
1419 prev = &suspended_ccalling_threads;
1420 for (tso = suspended_ccalling_threads;
1421 tso != END_TSO_QUEUE;
1422 prev = &tso->link, tso = tso->link) {
1423 if (tso->id == (StgThreadID)tok) {
1428 if (tso == END_TSO_QUEUE) {
1429 barf("resumeThread: thread not found");
1431 tso->link = END_TSO_QUEUE;
1434 while (free_capabilities == NULL) {
1435 IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1436 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1437 IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1439 cap = free_capabilities;
1440 free_capabilities = cap->link;
1441 n_free_capabilities--;
1443 cap = &MainCapability;
1446 cap->r.rCurrentTSO = tso;
1448 RELEASE_LOCK(&sched_mutex);
1453 /* ---------------------------------------------------------------------------
1455 * ------------------------------------------------------------------------ */
1456 static void unblockThread(StgTSO *tso);
1458 /* ---------------------------------------------------------------------------
1459 * Comparing Thread ids.
1461 * This is used from STG land in the implementation of the
1462 * instances of Eq/Ord for ThreadIds.
1463 * ------------------------------------------------------------------------ */
1465 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1467 StgThreadID id1 = tso1->id;
1468 StgThreadID id2 = tso2->id;
1470 if (id1 < id2) return (-1);
1471 if (id1 > id2) return 1;
1475 /* ---------------------------------------------------------------------------
1476 * Fetching the ThreadID from an StgTSO.
1478 * This is used in the implementation of Show for ThreadIds.
1479 * ------------------------------------------------------------------------ */
1480 int rts_getThreadId(const StgTSO *tso)
1485 /* ---------------------------------------------------------------------------
1486 Create a new thread.
1488 The new thread starts with the given stack size. Before the
1489 scheduler can run, however, this thread needs to have a closure
1490 (and possibly some arguments) pushed on its stack. See
1491 pushClosure() in Schedule.h.
1493 createGenThread() and createIOThread() (in SchedAPI.h) are
1494 convenient packaged versions of this function.
1496 currently pri (priority) is only used in a GRAN setup -- HWL
1497 ------------------------------------------------------------------------ */
1498 //@cindex createThread
1500 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1502 createThread(nat stack_size, StgInt pri)
1504 return createThread_(stack_size, rtsFalse, pri);
1508 createThread_(nat size, rtsBool have_lock, StgInt pri)
1512 createThread(nat stack_size)
1514 return createThread_(stack_size, rtsFalse);
1518 createThread_(nat size, rtsBool have_lock)
1525 /* First check whether we should create a thread at all */
1527 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1528 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1530 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1531 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1532 return END_TSO_QUEUE;
1538 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1541 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1543 /* catch ridiculously small stack sizes */
1544 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1545 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1548 stack_size = size - TSO_STRUCT_SIZEW;
1550 tso = (StgTSO *)allocate(size);
1551 TICK_ALLOC_TSO(stack_size, 0);
1553 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1555 SET_GRAN_HDR(tso, ThisPE);
1557 tso->what_next = ThreadEnterGHC;
1559 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1560 * protect the increment operation on next_thread_id.
1561 * In future, we could use an atomic increment instead.
1563 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1564 tso->id = next_thread_id++;
1565 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1567 tso->why_blocked = NotBlocked;
1568 tso->blocked_exceptions = NULL;
1570 tso->stack_size = stack_size;
1571 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1573 tso->sp = (P_)&(tso->stack) + stack_size;
1576 tso->prof.CCCS = CCS_MAIN;
1579 /* put a stop frame on the stack */
1580 tso->sp -= sizeofW(StgStopFrame);
1581 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1582 tso->su = (StgUpdateFrame*)tso->sp;
1586 tso->link = END_TSO_QUEUE;
1587 /* uses more flexible routine in GranSim */
1588 insertThread(tso, CurrentProc);
1590 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1596 if (RtsFlags.GranFlags.GranSimStats.Full)
1597 DumpGranEvent(GR_START,tso);
1599 if (RtsFlags.ParFlags.ParStats.Full)
1600 DumpGranEvent(GR_STARTQ,tso);
1601 /* HACk to avoid SCHEDULE
1605 /* Link the new thread on the global thread list.
1607 tso->global_link = all_threads;
1611 tso->dist.priority = MandatoryPriority; //by default that is...
1615 tso->gran.pri = pri;
1617 tso->gran.magic = TSO_MAGIC; // debugging only
1619 tso->gran.sparkname = 0;
1620 tso->gran.startedat = CURRENT_TIME;
1621 tso->gran.exported = 0;
1622 tso->gran.basicblocks = 0;
1623 tso->gran.allocs = 0;
1624 tso->gran.exectime = 0;
1625 tso->gran.fetchtime = 0;
1626 tso->gran.fetchcount = 0;
1627 tso->gran.blocktime = 0;
1628 tso->gran.blockcount = 0;
1629 tso->gran.blockedat = 0;
1630 tso->gran.globalsparks = 0;
1631 tso->gran.localsparks = 0;
1632 if (RtsFlags.GranFlags.Light)
1633 tso->gran.clock = Now; /* local clock */
1635 tso->gran.clock = 0;
1637 IF_DEBUG(gran,printTSO(tso));
1640 tso->par.magic = TSO_MAGIC; // debugging only
1642 tso->par.sparkname = 0;
1643 tso->par.startedat = CURRENT_TIME;
1644 tso->par.exported = 0;
1645 tso->par.basicblocks = 0;
1646 tso->par.allocs = 0;
1647 tso->par.exectime = 0;
1648 tso->par.fetchtime = 0;
1649 tso->par.fetchcount = 0;
1650 tso->par.blocktime = 0;
1651 tso->par.blockcount = 0;
1652 tso->par.blockedat = 0;
1653 tso->par.globalsparks = 0;
1654 tso->par.localsparks = 0;
1658 globalGranStats.tot_threads_created++;
1659 globalGranStats.threads_created_on_PE[CurrentProc]++;
1660 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1661 globalGranStats.tot_sq_probes++;
1663 // collect parallel global statistics (currently done together with GC stats)
1664 if (RtsFlags.ParFlags.ParStats.Global &&
1665 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1666 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
1667 globalParStats.tot_threads_created++;
1673 belch("==__ schedule: Created TSO %d (%p);",
1674 CurrentProc, tso, tso->id));
1676 IF_PAR_DEBUG(verbose,
1677 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1678 tso->id, tso, advisory_thread_count));
1680 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1681 tso->id, tso->stack_size));
1688 all parallel thread creation calls should fall through the following routine.
1691 createSparkThread(rtsSpark spark)
1693 ASSERT(spark != (rtsSpark)NULL);
1694 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1696 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1697 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1698 return END_TSO_QUEUE;
1702 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1703 if (tso==END_TSO_QUEUE)
1704 barf("createSparkThread: Cannot create TSO");
1706 tso->priority = AdvisoryPriority;
1708 pushClosure(tso,spark);
1709 PUSH_ON_RUN_QUEUE(tso);
1710 advisory_thread_count++;
1717 Turn a spark into a thread.
1718 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1721 //@cindex activateSpark
1723 activateSpark (rtsSpark spark)
1727 tso = createSparkThread(spark);
1728 if (RtsFlags.ParFlags.ParStats.Full) {
1729 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1730 IF_PAR_DEBUG(verbose,
1731 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1732 (StgClosure *)spark, info_type((StgClosure *)spark)));
1734 // ToDo: fwd info on local/global spark to thread -- HWL
1735 // tso->gran.exported = spark->exported;
1736 // tso->gran.locked = !spark->global;
1737 // tso->gran.sparkname = spark->name;
1743 /* ---------------------------------------------------------------------------
1746 * scheduleThread puts a thread on the head of the runnable queue.
1747 * This will usually be done immediately after a thread is created.
1748 * The caller of scheduleThread must create the thread using e.g.
1749 * createThread and push an appropriate closure
1750 * on this thread's stack before the scheduler is invoked.
1751 * ------------------------------------------------------------------------ */
1754 scheduleThread(StgTSO *tso)
1756 ACQUIRE_LOCK(&sched_mutex);
1758 /* Put the new thread on the head of the runnable queue. The caller
1759 * better push an appropriate closure on this thread's stack
1760 * beforehand. In the SMP case, the thread may start running as
1761 * soon as we release the scheduler lock below.
1763 PUSH_ON_RUN_QUEUE(tso);
1767 IF_DEBUG(scheduler,printTSO(tso));
1769 RELEASE_LOCK(&sched_mutex);
1772 /* ---------------------------------------------------------------------------
1775 * Start up Posix threads to run each of the scheduler tasks.
1776 * I believe the task ids are not needed in the system as defined.
1778 * ------------------------------------------------------------------------ */
1780 #if defined(PAR) || defined(SMP)
1782 taskStart(void) /* ( void *arg STG_UNUSED) */
1788 /* ---------------------------------------------------------------------------
1791 * Initialise the scheduler. This resets all the queues - if the
1792 * queues contained any threads, they'll be garbage collected at the
1795 * This now calls startTasks(), so should only be called once! KH @ 25/10/99
1796 * ------------------------------------------------------------------------ */
1800 term_handler(int sig STG_UNUSED)
1803 ACQUIRE_LOCK(&term_mutex);
1805 RELEASE_LOCK(&term_mutex);
1811 initCapability( Capability *cap )
1813 cap->f.stgChk0 = (F_)__stg_chk_0;
1814 cap->f.stgChk1 = (F_)__stg_chk_1;
1815 cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
1816 cap->f.stgUpdatePAP = (F_)__stg_update_PAP;
1825 for (i=0; i<=MAX_PROC; i++) {
1826 run_queue_hds[i] = END_TSO_QUEUE;
1827 run_queue_tls[i] = END_TSO_QUEUE;
1828 blocked_queue_hds[i] = END_TSO_QUEUE;
1829 blocked_queue_tls[i] = END_TSO_QUEUE;
1830 ccalling_threadss[i] = END_TSO_QUEUE;
1831 sleeping_queue = END_TSO_QUEUE;
1834 run_queue_hd = END_TSO_QUEUE;
1835 run_queue_tl = END_TSO_QUEUE;
1836 blocked_queue_hd = END_TSO_QUEUE;
1837 blocked_queue_tl = END_TSO_QUEUE;
1838 sleeping_queue = END_TSO_QUEUE;
1841 suspended_ccalling_threads = END_TSO_QUEUE;
1843 main_threads = NULL;
1844 all_threads = END_TSO_QUEUE;
1849 RtsFlags.ConcFlags.ctxtSwitchTicks =
1850 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1852 /* Install the SIGHUP handler */
1855 struct sigaction action,oact;
1857 action.sa_handler = term_handler;
1858 sigemptyset(&action.sa_mask);
1859 action.sa_flags = 0;
1860 if (sigaction(SIGTERM, &action, &oact) != 0) {
1861 barf("can't install TERM handler");
1867 /* Allocate N Capabilities */
1870 Capability *cap, *prev;
1873 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1874 cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1875 initCapability(cap);
1879 free_capabilities = cap;
1880 n_free_capabilities = RtsFlags.ParFlags.nNodes;
1882 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1883 n_free_capabilities););
1885 initCapability(&MainCapability);
1888 #if defined(SMP) || defined(PAR)
1901 /* make some space for saving all the thread ids */
1902 task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1903 "initScheduler:task_ids");
1905 /* and create all the threads */
1906 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1907 r = pthread_create(&tid,NULL,taskStart,NULL);
1909 barf("startTasks: Can't create new Posix thread");
1911 task_ids[i].id = tid;
1912 task_ids[i].mut_time = 0.0;
1913 task_ids[i].mut_etime = 0.0;
1914 task_ids[i].gc_time = 0.0;
1915 task_ids[i].gc_etime = 0.0;
1916 task_ids[i].elapsedtimestart = elapsedtime();
1917 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1923 exitScheduler( void )
1928 /* Don't want to use pthread_cancel, since we'd have to install
1929 * these silly exception handlers (pthread_cleanup_{push,pop}) around
1933 /* Cancel all our tasks */
1934 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1935 pthread_cancel(task_ids[i].id);
1938 /* Wait for all the tasks to terminate */
1939 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1940 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n",
1942 pthread_join(task_ids[i].id, NULL);
1946 /* Send 'em all a SIGHUP. That should shut 'em up.
1948 await_death = RtsFlags.ParFlags.nNodes;
1949 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1950 pthread_kill(task_ids[i].id,SIGTERM);
1952 while (await_death > 0) {
1958 /* -----------------------------------------------------------------------------
1959 Managing the per-task allocation areas.
1961 Each capability comes with an allocation area. These are
1962 fixed-length block lists into which allocation can be done.
1964 ToDo: no support for two-space collection at the moment???
1965 -------------------------------------------------------------------------- */
1967 /* -----------------------------------------------------------------------------
1968 * waitThread is the external interface for running a new computation
1969 * and waiting for the result.
1971 * In the non-SMP case, we create a new main thread, push it on the
1972 * main-thread stack, and invoke the scheduler to run it. The
1973 * scheduler will return when the top main thread on the stack has
1974 * completed or died, and fill in the necessary fields of the
1975 * main_thread structure.
1977 * In the SMP case, we create a main thread as before, but we then
1978 * create a new condition variable and sleep on it. When our new
1979 * main thread has completed, we'll be woken up and the status/result
1980 * will be in the main_thread struct.
1981 * -------------------------------------------------------------------------- */
1984 howManyThreadsAvail ( void )
1988 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1990 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1992 for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1998 finishAllThreads ( void )
2001 while (run_queue_hd != END_TSO_QUEUE) {
2002 waitThread ( run_queue_hd, NULL );
2004 while (blocked_queue_hd != END_TSO_QUEUE) {
2005 waitThread ( blocked_queue_hd, NULL );
2007 while (sleeping_queue != END_TSO_QUEUE) {
2008 waitThread ( blocked_queue_hd, NULL );
2011 (blocked_queue_hd != END_TSO_QUEUE ||
2012 run_queue_hd != END_TSO_QUEUE ||
2013 sleeping_queue != END_TSO_QUEUE);
2017 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2020 SchedulerStatus stat;
2022 ACQUIRE_LOCK(&sched_mutex);
2024 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2030 pthread_cond_init(&m->wakeup, NULL);
2033 m->link = main_threads;
2036 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n",
2041 pthread_cond_wait(&m->wakeup, &sched_mutex);
2042 } while (m->stat == NoStatus);
2044 /* GranSim specific init */
2045 CurrentTSO = m->tso; // the TSO to run
2046 procStatus[MainProc] = Busy; // status of main PE
2047 CurrentProc = MainProc; // PE to run it on
2052 ASSERT(m->stat != NoStatus);
2058 pthread_cond_destroy(&m->wakeup);
2061 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
2065 RELEASE_LOCK(&sched_mutex);
2070 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2071 //@subsection Run queue code
2075 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2076 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2077 implicit global variable that has to be correct when calling these
2081 /* Put the new thread on the head of the runnable queue.
2082 * The caller of createThread better push an appropriate closure
2083 * on this thread's stack before the scheduler is invoked.
2085 static /* inline */ void
2086 add_to_run_queue(tso)
2089 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2090 tso->link = run_queue_hd;
2092 if (run_queue_tl == END_TSO_QUEUE) {
2097 /* Put the new thread at the end of the runnable queue. */
2098 static /* inline */ void
2099 push_on_run_queue(tso)
2102 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2103 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2104 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2105 if (run_queue_hd == END_TSO_QUEUE) {
2108 run_queue_tl->link = tso;
2114 Should be inlined because it's used very often in schedule. The tso
2115 argument is actually only needed in GranSim, where we want to have the
2116 possibility to schedule *any* TSO on the run queue, irrespective of the
2117 actual ordering. Therefore, if tso is not the nil TSO then we traverse
2118 the run queue and dequeue the tso, adjusting the links in the queue.
2120 //@cindex take_off_run_queue
2121 static /* inline */ StgTSO*
2122 take_off_run_queue(StgTSO *tso) {
2126 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2128 if tso is specified, unlink that tso from the run_queue (doesn't have
2129 to be at the beginning of the queue); GranSim only
2131 if (tso!=END_TSO_QUEUE) {
2132 /* find tso in queue */
2133 for (t=run_queue_hd, prev=END_TSO_QUEUE;
2134 t!=END_TSO_QUEUE && t!=tso;
2138 /* now actually dequeue the tso */
2139 if (prev!=END_TSO_QUEUE) {
2140 ASSERT(run_queue_hd!=t);
2141 prev->link = t->link;
2143 /* t is at beginning of thread queue */
2144 ASSERT(run_queue_hd==t);
2145 run_queue_hd = t->link;
2147 /* t is at end of thread queue */
2148 if (t->link==END_TSO_QUEUE) {
2149 ASSERT(t==run_queue_tl);
2150 run_queue_tl = prev;
2152 ASSERT(run_queue_tl!=t);
2154 t->link = END_TSO_QUEUE;
2156 /* take tso from the beginning of the queue; std concurrent code */
2158 if (t != END_TSO_QUEUE) {
2159 run_queue_hd = t->link;
2160 t->link = END_TSO_QUEUE;
2161 if (run_queue_hd == END_TSO_QUEUE) {
2162 run_queue_tl = END_TSO_QUEUE;
2171 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2172 //@subsection Garbage Collextion Routines
2174 /* ---------------------------------------------------------------------------
2175 Where are the roots that we know about?
2177 - all the threads on the runnable queue
2178 - all the threads on the blocked queue
2179 - all the threads on the sleeping queue
2180 - all the thread currently executing a _ccall_GC
2181 - all the "main threads"
2183 ------------------------------------------------------------------------ */
2185 /* This has to be protected either by the scheduler monitor, or by the
2186 garbage collection monitor (probably the latter).
2191 GetRoots(evac_fn evac)
2198 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2199 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2200 evac((StgClosure **)&run_queue_hds[i]);
2201 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2202 evac((StgClosure **)&run_queue_tls[i]);
2204 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2205 evac((StgClosure **)&blocked_queue_hds[i]);
2206 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2207 evac((StgClosure **)&blocked_queue_tls[i]);
2208 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2209 evac((StgClosure **)&ccalling_threads[i]);
2216 if (run_queue_hd != END_TSO_QUEUE) {
2217 ASSERT(run_queue_tl != END_TSO_QUEUE);
2218 evac((StgClosure **)&run_queue_hd);
2219 evac((StgClosure **)&run_queue_tl);
2222 if (blocked_queue_hd != END_TSO_QUEUE) {
2223 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2224 evac((StgClosure **)&blocked_queue_hd);
2225 evac((StgClosure **)&blocked_queue_tl);
2228 if (sleeping_queue != END_TSO_QUEUE) {
2229 evac((StgClosure **)&sleeping_queue);
2233 for (m = main_threads; m != NULL; m = m->link) {
2234 evac((StgClosure **)&m->tso);
2236 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2237 evac((StgClosure **)&suspended_ccalling_threads);
2240 #if defined(SMP) || defined(PAR) || defined(GRAN)
2241 markSparkQueue(evac);
2245 /* -----------------------------------------------------------------------------
2248 This is the interface to the garbage collector from Haskell land.
2249 We provide this so that external C code can allocate and garbage
2250 collect when called from Haskell via _ccall_GC.
2252 It might be useful to provide an interface whereby the programmer
2253 can specify more roots (ToDo).
2255 This needs to be protected by the GC condition variable above. KH.
2256 -------------------------------------------------------------------------- */
2258 void (*extra_roots)(evac_fn);
2263 GarbageCollect(GetRoots,rtsFalse);
2267 performMajorGC(void)
2269 GarbageCollect(GetRoots,rtsTrue);
2273 AllRoots(evac_fn evac)
2275 GetRoots(evac); // the scheduler's roots
2276 extra_roots(evac); // the user's roots
2280 performGCWithRoots(void (*get_roots)(evac_fn))
2282 extra_roots = get_roots;
2283 GarbageCollect(AllRoots,rtsFalse);
2286 /* -----------------------------------------------------------------------------
2289 If the thread has reached its maximum stack size, then raise the
2290 StackOverflow exception in the offending thread. Otherwise
2291 relocate the TSO into a larger chunk of memory and adjust its stack
2293 -------------------------------------------------------------------------- */
2296 threadStackOverflow(StgTSO *tso)
2298 nat new_stack_size, new_tso_size, diff, stack_words;
2302 IF_DEBUG(sanity,checkTSO(tso));
2303 if (tso->stack_size >= tso->max_stack_size) {
2306 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2307 tso->id, tso, tso->stack_size, tso->max_stack_size);
2308 /* If we're debugging, just print out the top of the stack */
2309 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2312 /* Send this thread the StackOverflow exception */
2313 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2317 /* Try to double the current stack size. If that takes us over the
2318 * maximum stack size for this thread, then use the maximum instead.
2319 * Finally round up so the TSO ends up as a whole number of blocks.
2321 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2322 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2323 TSO_STRUCT_SIZE)/sizeof(W_);
2324 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2325 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2327 IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2329 dest = (StgTSO *)allocate(new_tso_size);
2330 TICK_ALLOC_TSO(new_stack_size,0);
2332 /* copy the TSO block and the old stack into the new area */
2333 memcpy(dest,tso,TSO_STRUCT_SIZE);
2334 stack_words = tso->stack + tso->stack_size - tso->sp;
2335 new_sp = (P_)dest + new_tso_size - stack_words;
2336 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2338 /* relocate the stack pointers... */
2339 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2340 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2342 dest->stack_size = new_stack_size;
2344 /* and relocate the update frame list */
2345 relocate_stack(dest, diff);
2347 /* Mark the old TSO as relocated. We have to check for relocated
2348 * TSOs in the garbage collector and any primops that deal with TSOs.
2350 * It's important to set the sp and su values to just beyond the end
2351 * of the stack, so we don't attempt to scavenge any part of the
2354 tso->what_next = ThreadRelocated;
2356 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2357 tso->su = (StgUpdateFrame *)tso->sp;
2358 tso->why_blocked = NotBlocked;
2359 dest->mut_link = NULL;
2361 IF_PAR_DEBUG(verbose,
2362 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2363 tso->id, tso, tso->stack_size);
2364 /* If we're debugging, just print out the top of the stack */
2365 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2368 IF_DEBUG(sanity,checkTSO(tso));
2370 IF_DEBUG(scheduler,printTSO(dest));
2376 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2377 //@subsection Blocking Queue Routines
2379 /* ---------------------------------------------------------------------------
2380 Wake up a queue that was blocked on some resource.
2381 ------------------------------------------------------------------------ */
2385 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2390 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2392 /* write RESUME events to log file and
2393 update blocked and fetch time (depending on type of the orig closure) */
2394 if (RtsFlags.ParFlags.ParStats.Full) {
2395 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2396 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2397 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2398 if (EMPTY_RUN_QUEUE())
2399 emitSchedule = rtsTrue;
2401 switch (get_itbl(node)->type) {
2403 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2408 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2415 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2422 static StgBlockingQueueElement *
2423 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2426 PEs node_loc, tso_loc;
2428 node_loc = where_is(node); // should be lifted out of loop
2429 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2430 tso_loc = where_is((StgClosure *)tso);
2431 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2432 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2433 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2434 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2435 // insertThread(tso, node_loc);
2436 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2438 tso, node, (rtsSpark*)NULL);
2439 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2442 } else { // TSO is remote (actually should be FMBQ)
2443 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2444 RtsFlags.GranFlags.Costs.gunblocktime +
2445 RtsFlags.GranFlags.Costs.latency;
2446 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2448 tso, node, (rtsSpark*)NULL);
2449 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2452 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2454 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2455 (node_loc==tso_loc ? "Local" : "Global"),
2456 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2457 tso->block_info.closure = NULL;
2458 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2462 static StgBlockingQueueElement *
2463 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2465 StgBlockingQueueElement *next;
2467 switch (get_itbl(bqe)->type) {
2469 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2470 /* if it's a TSO just push it onto the run_queue */
2472 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2473 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2475 unblockCount(bqe, node);
2476 /* reset blocking status after dumping event */
2477 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2481 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2483 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2484 PendingFetches = (StgBlockedFetch *)bqe;
2488 /* can ignore this case in a non-debugging setup;
2489 see comments on RBHSave closures above */
2491 /* check that the closure is an RBHSave closure */
2492 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2493 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2494 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2498 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2499 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2503 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2507 #else /* !GRAN && !PAR */
2509 unblockOneLocked(StgTSO *tso)
2513 ASSERT(get_itbl(tso)->type == TSO);
2514 ASSERT(tso->why_blocked != NotBlocked);
2515 tso->why_blocked = NotBlocked;
2517 PUSH_ON_RUN_QUEUE(tso);
2519 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2524 #if defined(GRAN) || defined(PAR)
2525 inline StgBlockingQueueElement *
2526 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2528 ACQUIRE_LOCK(&sched_mutex);
2529 bqe = unblockOneLocked(bqe, node);
2530 RELEASE_LOCK(&sched_mutex);
2535 unblockOne(StgTSO *tso)
2537 ACQUIRE_LOCK(&sched_mutex);
2538 tso = unblockOneLocked(tso);
2539 RELEASE_LOCK(&sched_mutex);
2546 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2548 StgBlockingQueueElement *bqe;
2553 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2554 node, CurrentProc, CurrentTime[CurrentProc],
2555 CurrentTSO->id, CurrentTSO));
2557 node_loc = where_is(node);
2559 ASSERT(q == END_BQ_QUEUE ||
2560 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2561 get_itbl(q)->type == CONSTR); // closure (type constructor)
2562 ASSERT(is_unique(node));
2564 /* FAKE FETCH: magically copy the node to the tso's proc;
2565 no Fetch necessary because in reality the node should not have been
2566 moved to the other PE in the first place
2568 if (CurrentProc!=node_loc) {
2570 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2571 node, node_loc, CurrentProc, CurrentTSO->id,
2572 // CurrentTSO, where_is(CurrentTSO),
2573 node->header.gran.procs));
2574 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2576 belch("## new bitmask of node %p is %#x",
2577 node, node->header.gran.procs));
2578 if (RtsFlags.GranFlags.GranSimStats.Global) {
2579 globalGranStats.tot_fake_fetches++;
2584 // ToDo: check: ASSERT(CurrentProc==node_loc);
2585 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2588 bqe points to the current element in the queue
2589 next points to the next element in the queue
2591 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2592 //tso_loc = where_is(tso);
2594 bqe = unblockOneLocked(bqe, node);
2597 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2598 the closure to make room for the anchor of the BQ */
2599 if (bqe!=END_BQ_QUEUE) {
2600 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2602 ASSERT((info_ptr==&RBH_Save_0_info) ||
2603 (info_ptr==&RBH_Save_1_info) ||
2604 (info_ptr==&RBH_Save_2_info));
2606 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2607 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2608 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2611 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2612 node, info_type(node)));
2615 /* statistics gathering */
2616 if (RtsFlags.GranFlags.GranSimStats.Global) {
2617 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2618 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2619 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2620 globalGranStats.tot_awbq++; // total no. of bqs awakened
2623 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2624 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2628 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2630 StgBlockingQueueElement *bqe;
2632 ACQUIRE_LOCK(&sched_mutex);
2634 IF_PAR_DEBUG(verbose,
2635 belch("##-_ AwBQ for node %p on [%x]: ",
2639 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2640 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2645 ASSERT(q == END_BQ_QUEUE ||
2646 get_itbl(q)->type == TSO ||
2647 get_itbl(q)->type == BLOCKED_FETCH ||
2648 get_itbl(q)->type == CONSTR);
2651 while (get_itbl(bqe)->type==TSO ||
2652 get_itbl(bqe)->type==BLOCKED_FETCH) {
2653 bqe = unblockOneLocked(bqe, node);
2655 RELEASE_LOCK(&sched_mutex);
2658 #else /* !GRAN && !PAR */
2660 awakenBlockedQueue(StgTSO *tso)
2662 ACQUIRE_LOCK(&sched_mutex);
2663 while (tso != END_TSO_QUEUE) {
2664 tso = unblockOneLocked(tso);
2666 RELEASE_LOCK(&sched_mutex);
2670 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2671 //@subsection Exception Handling Routines
2673 /* ---------------------------------------------------------------------------
2675 - usually called inside a signal handler so it mustn't do anything fancy.
2676 ------------------------------------------------------------------------ */
2679 interruptStgRts(void)
2685 /* -----------------------------------------------------------------------------
2688 This is for use when we raise an exception in another thread, which
2690 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2691 -------------------------------------------------------------------------- */
2693 #if defined(GRAN) || defined(PAR)
2695 NB: only the type of the blocking queue is different in GranSim and GUM
2696 the operations on the queue-elements are the same
2697 long live polymorphism!
2700 unblockThread(StgTSO *tso)
2702 StgBlockingQueueElement *t, **last;
2704 ACQUIRE_LOCK(&sched_mutex);
2705 switch (tso->why_blocked) {
2708 return; /* not blocked */
2711 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2713 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2714 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2716 last = (StgBlockingQueueElement **)&mvar->head;
2717 for (t = (StgBlockingQueueElement *)mvar->head;
2719 last = &t->link, last_tso = t, t = t->link) {
2720 if (t == (StgBlockingQueueElement *)tso) {
2721 *last = (StgBlockingQueueElement *)tso->link;
2722 if (mvar->tail == tso) {
2723 mvar->tail = (StgTSO *)last_tso;
2728 barf("unblockThread (MVAR): TSO not found");
2731 case BlockedOnBlackHole:
2732 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2734 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2736 last = &bq->blocking_queue;
2737 for (t = bq->blocking_queue;
2739 last = &t->link, t = t->link) {
2740 if (t == (StgBlockingQueueElement *)tso) {
2741 *last = (StgBlockingQueueElement *)tso->link;
2745 barf("unblockThread (BLACKHOLE): TSO not found");
2748 case BlockedOnException:
2750 StgTSO *target = tso->block_info.tso;
2752 ASSERT(get_itbl(target)->type == TSO);
2754 if (target->what_next == ThreadRelocated) {
2755 target = target->link;
2756 ASSERT(get_itbl(target)->type == TSO);
2759 ASSERT(target->blocked_exceptions != NULL);
2761 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2762 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2764 last = &t->link, t = t->link) {
2765 ASSERT(get_itbl(t)->type == TSO);
2766 if (t == (StgBlockingQueueElement *)tso) {
2767 *last = (StgBlockingQueueElement *)tso->link;
2771 barf("unblockThread (Exception): TSO not found");
2775 case BlockedOnWrite:
2777 /* take TSO off blocked_queue */
2778 StgBlockingQueueElement *prev = NULL;
2779 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2780 prev = t, t = t->link) {
2781 if (t == (StgBlockingQueueElement *)tso) {
2783 blocked_queue_hd = (StgTSO *)t->link;
2784 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2785 blocked_queue_tl = END_TSO_QUEUE;
2788 prev->link = t->link;
2789 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2790 blocked_queue_tl = (StgTSO *)prev;
2796 barf("unblockThread (I/O): TSO not found");
2799 case BlockedOnDelay:
2801 /* take TSO off sleeping_queue */
2802 StgBlockingQueueElement *prev = NULL;
2803 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2804 prev = t, t = t->link) {
2805 if (t == (StgBlockingQueueElement *)tso) {
2807 sleeping_queue = (StgTSO *)t->link;
2809 prev->link = t->link;
2814 barf("unblockThread (I/O): TSO not found");
2818 barf("unblockThread");
2822 tso->link = END_TSO_QUEUE;
2823 tso->why_blocked = NotBlocked;
2824 tso->block_info.closure = NULL;
2825 PUSH_ON_RUN_QUEUE(tso);
2826 RELEASE_LOCK(&sched_mutex);
2830 unblockThread(StgTSO *tso)
2834 ACQUIRE_LOCK(&sched_mutex);
2835 switch (tso->why_blocked) {
2838 return; /* not blocked */
2841 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2843 StgTSO *last_tso = END_TSO_QUEUE;
2844 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2847 for (t = mvar->head; t != END_TSO_QUEUE;
2848 last = &t->link, last_tso = t, t = t->link) {
2851 if (mvar->tail == tso) {
2852 mvar->tail = last_tso;
2857 barf("unblockThread (MVAR): TSO not found");
2860 case BlockedOnBlackHole:
2861 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2863 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2865 last = &bq->blocking_queue;
2866 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2867 last = &t->link, t = t->link) {
2873 barf("unblockThread (BLACKHOLE): TSO not found");
2876 case BlockedOnException:
2878 StgTSO *target = tso->block_info.tso;
2880 ASSERT(get_itbl(target)->type == TSO);
2882 while (target->what_next == ThreadRelocated) {
2883 target = target->link;
2884 ASSERT(get_itbl(target)->type == TSO);
2887 ASSERT(target->blocked_exceptions != NULL);
2889 last = &target->blocked_exceptions;
2890 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2891 last = &t->link, t = t->link) {
2892 ASSERT(get_itbl(t)->type == TSO);
2898 barf("unblockThread (Exception): TSO not found");
2902 case BlockedOnWrite:
2904 StgTSO *prev = NULL;
2905 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2906 prev = t, t = t->link) {
2909 blocked_queue_hd = t->link;
2910 if (blocked_queue_tl == t) {
2911 blocked_queue_tl = END_TSO_QUEUE;
2914 prev->link = t->link;
2915 if (blocked_queue_tl == t) {
2916 blocked_queue_tl = prev;
2922 barf("unblockThread (I/O): TSO not found");
2925 case BlockedOnDelay:
2927 StgTSO *prev = NULL;
2928 for (t = sleeping_queue; t != END_TSO_QUEUE;
2929 prev = t, t = t->link) {
2932 sleeping_queue = t->link;
2934 prev->link = t->link;
2939 barf("unblockThread (I/O): TSO not found");
2943 barf("unblockThread");
2947 tso->link = END_TSO_QUEUE;
2948 tso->why_blocked = NotBlocked;
2949 tso->block_info.closure = NULL;
2950 PUSH_ON_RUN_QUEUE(tso);
2951 RELEASE_LOCK(&sched_mutex);
2955 /* -----------------------------------------------------------------------------
2958 * The following function implements the magic for raising an
2959 * asynchronous exception in an existing thread.
2961 * We first remove the thread from any queue on which it might be
2962 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2964 * We strip the stack down to the innermost CATCH_FRAME, building
2965 * thunks in the heap for all the active computations, so they can
2966 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2967 * an application of the handler to the exception, and push it on
2968 * the top of the stack.
2970 * How exactly do we save all the active computations? We create an
2971 * AP_UPD for every UpdateFrame on the stack. Entering one of these
2972 * AP_UPDs pushes everything from the corresponding update frame
2973 * upwards onto the stack. (Actually, it pushes everything up to the
2974 * next update frame plus a pointer to the next AP_UPD object.
2975 * Entering the next AP_UPD object pushes more onto the stack until we
2976 * reach the last AP_UPD object - at which point the stack should look
2977 * exactly as it did when we killed the TSO and we can continue
2978 * execution by entering the closure on top of the stack.
2980 * We can also kill a thread entirely - this happens if either (a) the
2981 * exception passed to raiseAsync is NULL, or (b) there's no
2982 * CATCH_FRAME on the stack. In either case, we strip the entire
2983 * stack and replace the thread with a zombie.
2985 * -------------------------------------------------------------------------- */
2988 deleteThread(StgTSO *tso)
2990 raiseAsync(tso,NULL);
2994 raiseAsync(StgTSO *tso, StgClosure *exception)
2996 StgUpdateFrame* su = tso->su;
2997 StgPtr sp = tso->sp;
2999 /* Thread already dead? */
3000 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3004 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3006 /* Remove it from any blocking queues */
3009 /* The stack freezing code assumes there's a closure pointer on
3010 * the top of the stack. This isn't always the case with compiled
3011 * code, so we have to push a dummy closure on the top which just
3012 * returns to the next return address on the stack.
3014 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3015 *(--sp) = (W_)&stg_dummy_ret_closure;
3019 nat words = ((P_)su - (P_)sp) - 1;
3023 /* If we find a CATCH_FRAME, and we've got an exception to raise,
3024 * then build PAP(handler,exception,realworld#), and leave it on
3025 * top of the stack ready to enter.
3027 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3028 StgCatchFrame *cf = (StgCatchFrame *)su;
3029 /* we've got an exception to raise, so let's pass it to the
3030 * handler in this frame.
3032 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3033 TICK_ALLOC_UPD_PAP(3,0);
3034 SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3037 ap->fun = cf->handler; /* :: Exception -> IO a */
3038 ap->payload[0] = exception;
3039 ap->payload[1] = ARG_TAG(0); /* realworld token */
3041 /* throw away the stack from Sp up to and including the
3044 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
3047 /* Restore the blocked/unblocked state for asynchronous exceptions
3048 * at the CATCH_FRAME.
3050 * If exceptions were unblocked at the catch, arrange that they
3051 * are unblocked again after executing the handler by pushing an
3052 * unblockAsyncExceptions_ret stack frame.
3054 if (!cf->exceptions_blocked) {
3055 *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3058 /* Ensure that async exceptions are blocked when running the handler.
3060 if (tso->blocked_exceptions == NULL) {
3061 tso->blocked_exceptions = END_TSO_QUEUE;
3064 /* Put the newly-built PAP on top of the stack, ready to execute
3065 * when the thread restarts.
3069 tso->what_next = ThreadEnterGHC;
3070 IF_DEBUG(sanity, checkTSO(tso));
3074 /* First build an AP_UPD consisting of the stack chunk above the
3075 * current update frame, with the top word on the stack as the
3078 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3083 ap->fun = (StgClosure *)sp[0];
3085 for(i=0; i < (nat)words; ++i) {
3086 ap->payload[i] = (StgClosure *)*sp++;
3089 switch (get_itbl(su)->type) {
3093 SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */);
3094 TICK_ALLOC_UP_THK(words+1,0);
3097 fprintf(stderr, "scheduler: Updating ");
3098 printPtr((P_)su->updatee);
3099 fprintf(stderr, " with ");
3100 printObj((StgClosure *)ap);
3103 /* Replace the updatee with an indirection - happily
3104 * this will also wake up any threads currently
3105 * waiting on the result.
3107 * Warning: if we're in a loop, more than one update frame on
3108 * the stack may point to the same object. Be careful not to
3109 * overwrite an IND_OLDGEN in this case, because we'll screw
3110 * up the mutable lists. To be on the safe side, don't
3111 * overwrite any kind of indirection at all. See also
3112 * threadSqueezeStack in GC.c, where we have to make a similar
3115 if (!closure_IND(su->updatee)) {
3116 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
3119 sp += sizeofW(StgUpdateFrame) -1;
3120 sp[0] = (W_)ap; /* push onto stack */
3126 StgCatchFrame *cf = (StgCatchFrame *)su;
3129 /* We want a PAP, not an AP_UPD. Fortunately, the
3130 * layout's the same.
3132 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3133 TICK_ALLOC_UPD_PAP(words+1,0);
3135 /* now build o = FUN(catch,ap,handler) */
3136 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3137 TICK_ALLOC_FUN(2,0);
3138 SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3139 o->payload[0] = (StgClosure *)ap;
3140 o->payload[1] = cf->handler;
3143 fprintf(stderr, "scheduler: Built ");
3144 printObj((StgClosure *)o);
3147 /* pop the old handler and put o on the stack */
3149 sp += sizeofW(StgCatchFrame) - 1;
3156 StgSeqFrame *sf = (StgSeqFrame *)su;
3159 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3160 TICK_ALLOC_UPD_PAP(words+1,0);
3162 /* now build o = FUN(seq,ap) */
3163 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3164 TICK_ALLOC_SE_THK(1,0);
3165 SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3166 o->payload[0] = (StgClosure *)ap;
3169 fprintf(stderr, "scheduler: Built ");
3170 printObj((StgClosure *)o);
3173 /* pop the old handler and put o on the stack */
3175 sp += sizeofW(StgSeqFrame) - 1;
3181 /* We've stripped the entire stack, the thread is now dead. */
3182 sp += sizeofW(StgStopFrame) - 1;
3183 sp[0] = (W_)exception; /* save the exception */
3184 tso->what_next = ThreadKilled;
3185 tso->su = (StgUpdateFrame *)(sp+1);
3196 /* -----------------------------------------------------------------------------
3197 resurrectThreads is called after garbage collection on the list of
3198 threads found to be garbage. Each of these threads will be woken
3199 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3200 on an MVar, or NonTermination if the thread was blocked on a Black
3202 -------------------------------------------------------------------------- */
3205 resurrectThreads( StgTSO *threads )
3209 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3210 next = tso->global_link;
3211 tso->global_link = all_threads;
3213 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3215 switch (tso->why_blocked) {
3217 case BlockedOnException:
3218 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3220 case BlockedOnBlackHole:
3221 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3224 /* This might happen if the thread was blocked on a black hole
3225 * belonging to a thread that we've just woken up (raiseAsync
3226 * can wake up threads, remember...).
3230 barf("resurrectThreads: thread blocked in a strange way");
3235 /* -----------------------------------------------------------------------------
3236 * Blackhole detection: if we reach a deadlock, test whether any
3237 * threads are blocked on themselves. Any threads which are found to
3238 * be self-blocked get sent a NonTermination exception.
3240 * This is only done in a deadlock situation in order to avoid
3241 * performance overhead in the normal case.
3242 * -------------------------------------------------------------------------- */
3245 detectBlackHoles( void )
3247 StgTSO *t = all_threads;
3248 StgUpdateFrame *frame;
3249 StgClosure *blocked_on;
3251 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3253 while (t->what_next == ThreadRelocated) {
3255 ASSERT(get_itbl(t)->type == TSO);
3258 if (t->why_blocked != BlockedOnBlackHole) {
3262 blocked_on = t->block_info.closure;
3264 for (frame = t->su; ; frame = frame->link) {
3265 switch (get_itbl(frame)->type) {
3268 if (frame->updatee == blocked_on) {
3269 /* We are blocking on one of our own computations, so
3270 * send this thread the NonTermination exception.
3273 sched_belch("thread %d is blocked on itself", t->id));
3274 raiseAsync(t, (StgClosure *)NonTermination_closure);
3295 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3296 //@subsection Debugging Routines
3298 /* -----------------------------------------------------------------------------
3299 Debugging: why is a thread blocked
3300 -------------------------------------------------------------------------- */
3305 printThreadBlockage(StgTSO *tso)
3307 switch (tso->why_blocked) {
3309 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3311 case BlockedOnWrite:
3312 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3314 case BlockedOnDelay:
3315 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3318 fprintf(stderr,"is blocked on an MVar");
3320 case BlockedOnException:
3321 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3322 tso->block_info.tso->id);
3324 case BlockedOnBlackHole:
3325 fprintf(stderr,"is blocked on a black hole");
3328 fprintf(stderr,"is not blocked");
3332 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3333 tso->block_info.closure, info_type(tso->block_info.closure));
3335 case BlockedOnGA_NoSend:
3336 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3337 tso->block_info.closure, info_type(tso->block_info.closure));
3341 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3342 tso->why_blocked, tso->id, tso);
3347 printThreadStatus(StgTSO *tso)
3349 switch (tso->what_next) {
3351 fprintf(stderr,"has been killed");
3353 case ThreadComplete:
3354 fprintf(stderr,"has completed");
3357 printThreadBlockage(tso);
3362 printAllThreads(void)
3367 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3368 ullong_format_string(TIME_ON_PROC(CurrentProc),
3369 time_string, rtsFalse/*no commas!*/);
3371 sched_belch("all threads at [%s]:", time_string);
3373 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3374 ullong_format_string(CURRENT_TIME,
3375 time_string, rtsFalse/*no commas!*/);
3377 sched_belch("all threads at [%s]:", time_string);
3379 sched_belch("all threads:");
3382 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3383 fprintf(stderr, "\tthread %d ", t->id);
3384 printThreadStatus(t);
3385 fprintf(stderr,"\n");
3390 Print a whole blocking queue attached to node (debugging only).
3395 print_bq (StgClosure *node)
3397 StgBlockingQueueElement *bqe;
3401 fprintf(stderr,"## BQ of closure %p (%s): ",
3402 node, info_type(node));
3404 /* should cover all closures that may have a blocking queue */
3405 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3406 get_itbl(node)->type == FETCH_ME_BQ ||
3407 get_itbl(node)->type == RBH ||
3408 get_itbl(node)->type == MVAR);
3410 ASSERT(node!=(StgClosure*)NULL); // sanity check
3412 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3416 Print a whole blocking queue starting with the element bqe.
3419 print_bqe (StgBlockingQueueElement *bqe)
3424 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3426 for (end = (bqe==END_BQ_QUEUE);
3427 !end; // iterate until bqe points to a CONSTR
3428 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3429 bqe = end ? END_BQ_QUEUE : bqe->link) {
3430 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3431 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3432 /* types of closures that may appear in a blocking queue */
3433 ASSERT(get_itbl(bqe)->type == TSO ||
3434 get_itbl(bqe)->type == BLOCKED_FETCH ||
3435 get_itbl(bqe)->type == CONSTR);
3436 /* only BQs of an RBH end with an RBH_Save closure */
3437 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3439 switch (get_itbl(bqe)->type) {
3441 fprintf(stderr," TSO %u (%x),",
3442 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3445 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3446 ((StgBlockedFetch *)bqe)->node,
3447 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3448 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3449 ((StgBlockedFetch *)bqe)->ga.weight);
3452 fprintf(stderr," %s (IP %p),",
3453 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3454 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3455 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3456 "RBH_Save_?"), get_itbl(bqe));
3459 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3460 info_type((StgClosure *)bqe)); // , node, info_type(node));
3464 fputc('\n', stderr);
3466 # elif defined(GRAN)
3468 print_bq (StgClosure *node)
3470 StgBlockingQueueElement *bqe;
3471 PEs node_loc, tso_loc;
3474 /* should cover all closures that may have a blocking queue */
3475 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3476 get_itbl(node)->type == FETCH_ME_BQ ||
3477 get_itbl(node)->type == RBH);
3479 ASSERT(node!=(StgClosure*)NULL); // sanity check
3480 node_loc = where_is(node);
3482 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3483 node, info_type(node), node_loc);
3486 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3488 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3489 !end; // iterate until bqe points to a CONSTR
3490 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3491 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3492 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3493 /* types of closures that may appear in a blocking queue */
3494 ASSERT(get_itbl(bqe)->type == TSO ||
3495 get_itbl(bqe)->type == CONSTR);
3496 /* only BQs of an RBH end with an RBH_Save closure */
3497 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3499 tso_loc = where_is((StgClosure *)bqe);
3500 switch (get_itbl(bqe)->type) {
3502 fprintf(stderr," TSO %d (%p) on [PE %d],",
3503 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3506 fprintf(stderr," %s (IP %p),",
3507 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3508 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3509 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3510 "RBH_Save_?"), get_itbl(bqe));
3513 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3514 info_type((StgClosure *)bqe), node, info_type(node));
3518 fputc('\n', stderr);
3522 Nice and easy: only TSOs on the blocking queue
3525 print_bq (StgClosure *node)
3529 ASSERT(node!=(StgClosure*)NULL); // sanity check
3530 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3531 tso != END_TSO_QUEUE;
3533 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3534 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3535 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3537 fputc('\n', stderr);
3548 for (i=0, tso=run_queue_hd;
3549 tso != END_TSO_QUEUE;
3558 sched_belch(char *s, ...)
3563 fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3565 fprintf(stderr, "== ");
3567 fprintf(stderr, "scheduler: ");
3569 vfprintf(stderr, s, ap);
3570 fprintf(stderr, "\n");
3576 //@node Index, , Debugging Routines, Main scheduling code
3580 //* MainRegTable:: @cindex\s-+MainRegTable
3581 //* StgMainThread:: @cindex\s-+StgMainThread
3582 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3583 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3584 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3585 //* context_switch:: @cindex\s-+context_switch
3586 //* createThread:: @cindex\s-+createThread
3587 //* free_capabilities:: @cindex\s-+free_capabilities
3588 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3589 //* initScheduler:: @cindex\s-+initScheduler
3590 //* interrupted:: @cindex\s-+interrupted
3591 //* n_free_capabilities:: @cindex\s-+n_free_capabilities
3592 //* next_thread_id:: @cindex\s-+next_thread_id
3593 //* print_bq:: @cindex\s-+print_bq
3594 //* run_queue_hd:: @cindex\s-+run_queue_hd
3595 //* run_queue_tl:: @cindex\s-+run_queue_tl
3596 //* sched_mutex:: @cindex\s-+sched_mutex
3597 //* schedule:: @cindex\s-+schedule
3598 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3599 //* task_ids:: @cindex\s-+task_ids
3600 //* term_mutex:: @cindex\s-+term_mutex
3601 //* thread_ready_cond:: @cindex\s-+thread_ready_cond