1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.79 2000/10/10 09:12:19 simonmar Exp $
4 * (c) The GHC Team, 1998-2000
8 * The main scheduling code in GranSim is quite different from that in std
9 * (concurrent) Haskell: while concurrent Haskell just iterates over the
10 * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11 * over the events in the global event queue. -- HWL
12 * --------------------------------------------------------------------------*/
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
17 /* Version with scheduler monitor support for SMPs.
19 This design provides a high-level API to create and schedule threads etc.
20 as documented in the SMP design document.
22 It uses a monitor design controlled by a single mutex to exercise control
23 over accesses to shared data structures, and builds on the Posix threads
26 The majority of state is shared. In order to keep essential per-task state,
27 there is a Capability structure, which contains all the information
28 needed to run a thread: its STG registers, a pointer to its TSO, a
29 nursery etc. During STG execution, a pointer to the capability is
30 kept in a register (BaseReg).
32 In a non-SMP build, there is one global capability, namely MainRegTable.
39 //* Variables and Data structures::
40 //* Main scheduling loop::
41 //* Suspend and Resume::
43 //* Garbage Collextion Routines::
44 //* Blocking Queue Routines::
45 //* Exception Handling Routines::
46 //* Debugging Routines::
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
59 #include "StgStartup.h"
63 #include "StgMiscClosures.h"
65 #include "Evaluator.h"
66 #include "Exception.h"
74 #if defined(GRAN) || defined(PAR)
75 # include "GranSimRts.h"
77 # include "ParallelRts.h"
78 # include "Parallel.h"
79 # include "ParallelDebug.h"
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
92 * These are the threads which clients have requested that we run.
94 * In an SMP build, we might have several concurrent clients all
95 * waiting for results, and each one will wait on a condition variable
96 * until the result is available.
98 * In non-SMP, clients are strictly nested: the first client calls
99 * into the RTS, which might call out again to C with a _ccall_GC, and
100 * eventually re-enter the RTS.
102 * Main threads information is kept in a linked list:
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
107 SchedulerStatus stat;
110 pthread_cond_t wakeup;
112 struct StgMainThread_ *link;
115 /* Main thread queue.
116 * Locks required: sched_mutex.
118 static StgMainThread *main_threads;
121 * Locks required: sched_mutex.
125 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
126 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
129 In GranSim we have a runable and a blocked queue for each processor.
130 In order to minimise code changes new arrays run_queue_hds/tls
131 are created. run_queue_hd is then a short cut (macro) for
132 run_queue_hds[CurrentProc] (see GranSim.h).
135 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
136 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
137 StgTSO *ccalling_threadss[MAX_PROC];
138 /* We use the same global list of threads (all_threads) in GranSim as in
139 the std RTS (i.e. we are cheating). However, we don't use this list in
140 the GranSim specific code at the moment (so we are only potentially
145 StgTSO *run_queue_hd, *run_queue_tl;
146 StgTSO *blocked_queue_hd, *blocked_queue_tl;
147 StgTSO *sleeping_queue; /* perhaps replace with a hash table? */
151 /* Linked list of all threads.
152 * Used for detecting garbage collected threads.
156 /* Threads suspended in _ccall_GC.
158 static StgTSO *suspended_ccalling_threads;
160 static void GetRoots(void);
161 static StgTSO *threadStackOverflow(StgTSO *tso);
163 /* KH: The following two flags are shared memory locations. There is no need
164 to lock them, since they are only unset at the end of a scheduler
168 /* flag set by signal handler to precipitate a context switch */
169 //@cindex context_switch
172 /* if this flag is set as well, give up execution */
173 //@cindex interrupted
176 /* Next thread ID to allocate.
177 * Locks required: sched_mutex
179 //@cindex next_thread_id
180 StgThreadID next_thread_id = 1;
183 * Pointers to the state of the current thread.
184 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
185 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
188 /* The smallest stack size that makes any sense is:
189 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
190 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
191 * + 1 (the realworld token for an IO thread)
192 * + 1 (the closure to enter)
194 * A thread with this stack will bomb immediately with a stack
195 * overflow, which will increase its stack size.
198 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
200 /* Free capability list.
201 * Locks required: sched_mutex.
204 //@cindex free_capabilities
205 //@cindex n_free_capabilities
206 Capability *free_capabilities; /* Available capabilities for running threads */
207 nat n_free_capabilities; /* total number of available capabilities */
209 //@cindex MainRegTable
210 Capability MainRegTable; /* for non-SMP, we have one global capability */
219 /* All our current task ids, saved in case we need to kill them later.
226 void addToBlockedQueue ( StgTSO *tso );
228 static void schedule ( void );
229 void interruptStgRts ( void );
231 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
233 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
236 static void detectBlackHoles ( void );
239 static void sched_belch(char *s, ...);
243 //@cindex sched_mutex
245 //@cindex thread_ready_cond
246 //@cindex gc_pending_cond
247 pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
248 pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
249 pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
250 pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
257 rtsTime TimeOfLastYield;
261 char *whatNext_strs[] = {
269 char *threadReturnCode_strs[] = {
270 "HeapOverflow", /* might also be StackOverflow */
279 * The thread state for the main thread.
280 // ToDo: check whether not needed any more
284 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
285 //@subsection Main scheduling loop
287 /* ---------------------------------------------------------------------------
288 Main scheduling loop.
290 We use round-robin scheduling, each thread returning to the
291 scheduler loop when one of these conditions is detected:
294 * timer expires (thread yields)
299 Locking notes: we acquire the scheduler lock once at the beginning
300 of the scheduler loop, and release it when
302 * running a thread, or
303 * waiting for work, or
304 * waiting for a GC to complete.
307 In a GranSim setup this loop iterates over the global event queue.
308 This revolves around the global event queue, which determines what
309 to do next. Therefore, it's more complicated than either the
310 concurrent or the parallel (GUM) setup.
313 GUM iterates over incoming messages.
314 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
315 and sends out a fish whenever it has nothing to do; in-between
316 doing the actual reductions (shared code below) it processes the
317 incoming messages and deals with delayed operations
318 (see PendingFetches).
319 This is not the ugliest code you could imagine, but it's bloody close.
321 ------------------------------------------------------------------------ */
328 StgThreadReturnCode ret;
337 rtsBool was_interrupted = rtsFalse;
339 ACQUIRE_LOCK(&sched_mutex);
343 /* set up first event to get things going */
344 /* ToDo: assign costs for system setup and init MainTSO ! */
345 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
347 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
350 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
351 G_TSO(CurrentTSO, 5));
353 if (RtsFlags.GranFlags.Light) {
354 /* Save current time; GranSim Light only */
355 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
358 event = get_next_event();
360 while (event!=(rtsEvent*)NULL) {
361 /* Choose the processor with the next event */
362 CurrentProc = event->proc;
363 CurrentTSO = event->tso;
367 while (!GlobalStopPending) { /* GlobalStopPending set in par_exit */
375 IF_DEBUG(scheduler, printAllThreads());
377 /* If we're interrupted (the user pressed ^C, or some other
378 * termination condition occurred), kill all the currently running
382 IF_DEBUG(scheduler, sched_belch("interrupted"));
384 interrupted = rtsFalse;
385 was_interrupted = rtsTrue;
388 /* Go through the list of main threads and wake up any
389 * clients whose computations have finished. ToDo: this
390 * should be done more efficiently without a linear scan
391 * of the main threads list, somehow...
395 StgMainThread *m, **prev;
396 prev = &main_threads;
397 for (m = main_threads; m != NULL; m = m->link) {
398 switch (m->tso->what_next) {
401 *(m->ret) = (StgClosure *)m->tso->sp[0];
405 pthread_cond_broadcast(&m->wakeup);
409 if (was_interrupted) {
410 m->stat = Interrupted;
414 pthread_cond_broadcast(&m->wakeup);
424 /* in GUM do this only on the Main PE */
427 /* If our main thread has finished or been killed, return.
430 StgMainThread *m = main_threads;
431 if (m->tso->what_next == ThreadComplete
432 || m->tso->what_next == ThreadKilled) {
433 main_threads = main_threads->link;
434 if (m->tso->what_next == ThreadComplete) {
435 /* we finished successfully, fill in the return value */
436 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
440 if (was_interrupted) {
441 m->stat = Interrupted;
451 /* Top up the run queue from our spark pool. We try to make the
452 * number of threads in the run queue equal to the number of
457 nat n = n_free_capabilities;
458 StgTSO *tso = run_queue_hd;
460 /* Count the run queue */
461 while (n > 0 && tso != END_TSO_QUEUE) {
470 break; /* no more sparks in the pool */
472 /* I'd prefer this to be done in activateSpark -- HWL */
473 /* tricky - it needs to hold the scheduler lock and
474 * not try to re-acquire it -- SDM */
476 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
477 pushClosure(tso,spark);
478 PUSH_ON_RUN_QUEUE(tso);
480 advisory_thread_count++;
484 sched_belch("turning spark of closure %p into a thread",
485 (StgClosure *)spark));
488 /* We need to wake up the other tasks if we just created some
491 if (n_free_capabilities - n > 1) {
492 pthread_cond_signal(&thread_ready_cond);
497 /* Check whether any waiting threads need to be woken up. If the
498 * run queue is empty, and there are no other tasks running, we
499 * can wait indefinitely for something to happen.
500 * ToDo: what if another client comes along & requests another
503 if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
505 (run_queue_hd == END_TSO_QUEUE)
507 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
511 /* we can be interrupted while waiting for I/O... */
512 if (interrupted) continue;
514 /* check for signals each time around the scheduler */
515 #ifndef mingw32_TARGET_OS
516 if (signals_pending()) {
517 start_signal_handlers();
522 * Detect deadlock: when we have no threads to run, there are no
523 * threads waiting on I/O or sleeping, and all the other tasks are
524 * waiting for work, we must have a deadlock of some description.
526 * We first try to find threads blocked on themselves (ie. black
527 * holes), and generate NonTermination exceptions where necessary.
529 * If no threads are black holed, we have a deadlock situation, so
530 * inform all the main threads.
533 if (blocked_queue_hd == END_TSO_QUEUE
534 && run_queue_hd == END_TSO_QUEUE
535 && sleeping_queue == END_TSO_QUEUE
536 && (n_free_capabilities == RtsFlags.ParFlags.nNodes))
538 IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes..."));
540 if (run_queue_hd == END_TSO_QUEUE) {
542 for (m = main_threads; m != NULL; m = m->link) {
545 pthread_cond_broadcast(&m->wakeup);
551 if (blocked_queue_hd == END_TSO_QUEUE
552 && run_queue_hd == END_TSO_QUEUE
553 && sleeping_queue == END_TSO_QUEUE)
555 IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes..."));
557 if (run_queue_hd == END_TSO_QUEUE) {
558 StgMainThread *m = main_threads;
561 main_threads = m->link;
568 /* If there's a GC pending, don't do anything until it has
572 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
573 pthread_cond_wait(&gc_pending_cond, &sched_mutex);
576 /* block until we've got a thread on the run queue and a free
579 while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
580 IF_DEBUG(scheduler, sched_belch("waiting for work"));
581 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
582 IF_DEBUG(scheduler, sched_belch("work now available"));
588 if (RtsFlags.GranFlags.Light)
589 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
591 /* adjust time based on time-stamp */
592 if (event->time > CurrentTime[CurrentProc] &&
593 event->evttype != ContinueThread)
594 CurrentTime[CurrentProc] = event->time;
596 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
597 if (!RtsFlags.GranFlags.Light)
600 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
602 /* main event dispatcher in GranSim */
603 switch (event->evttype) {
604 /* Should just be continuing execution */
606 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
607 /* ToDo: check assertion
608 ASSERT(run_queue_hd != (StgTSO*)NULL &&
609 run_queue_hd != END_TSO_QUEUE);
611 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
612 if (!RtsFlags.GranFlags.DoAsyncFetch &&
613 procStatus[CurrentProc]==Fetching) {
614 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
615 CurrentTSO->id, CurrentTSO, CurrentProc);
618 /* Ignore ContinueThreads for completed threads */
619 if (CurrentTSO->what_next == ThreadComplete) {
620 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
621 CurrentTSO->id, CurrentTSO, CurrentProc);
624 /* Ignore ContinueThreads for threads that are being migrated */
625 if (PROCS(CurrentTSO)==Nowhere) {
626 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
627 CurrentTSO->id, CurrentTSO, CurrentProc);
630 /* The thread should be at the beginning of the run queue */
631 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
632 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
633 CurrentTSO->id, CurrentTSO, CurrentProc);
634 break; // run the thread anyway
637 new_event(proc, proc, CurrentTime[proc],
639 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
641 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
642 break; // now actually run the thread; DaH Qu'vam yImuHbej
645 do_the_fetchnode(event);
646 goto next_thread; /* handle next event in event queue */
649 do_the_globalblock(event);
650 goto next_thread; /* handle next event in event queue */
653 do_the_fetchreply(event);
654 goto next_thread; /* handle next event in event queue */
656 case UnblockThread: /* Move from the blocked queue to the tail of */
657 do_the_unblock(event);
658 goto next_thread; /* handle next event in event queue */
660 case ResumeThread: /* Move from the blocked queue to the tail of */
661 /* the runnable queue ( i.e. Qu' SImqa'lu') */
662 event->tso->gran.blocktime +=
663 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
664 do_the_startthread(event);
665 goto next_thread; /* handle next event in event queue */
668 do_the_startthread(event);
669 goto next_thread; /* handle next event in event queue */
672 do_the_movethread(event);
673 goto next_thread; /* handle next event in event queue */
676 do_the_movespark(event);
677 goto next_thread; /* handle next event in event queue */
680 do_the_findwork(event);
681 goto next_thread; /* handle next event in event queue */
684 barf("Illegal event type %u\n", event->evttype);
687 /* This point was scheduler_loop in the old RTS */
689 IF_DEBUG(gran, belch("GRAN: after main switch"));
691 TimeOfLastEvent = CurrentTime[CurrentProc];
692 TimeOfNextEvent = get_time_of_next_event();
693 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
694 // CurrentTSO = ThreadQueueHd;
696 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
699 if (RtsFlags.GranFlags.Light)
700 GranSimLight_leave_system(event, &ActiveTSO);
702 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
705 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
707 /* in a GranSim setup the TSO stays on the run queue */
709 /* Take a thread from the run queue. */
710 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
713 fprintf(stderr, "GRAN: About to run current thread, which is\n");
716 context_switch = 0; // turned on via GranYield, checking events and time slice
719 DumpGranEvent(GR_SCHEDULE, t));
721 procStatus[CurrentProc] = Busy;
725 if (PendingFetches != END_BF_QUEUE) {
729 /* ToDo: phps merge with spark activation above */
730 /* check whether we have local work and send requests if we have none */
731 if (run_queue_hd == END_TSO_QUEUE) { /* no runnable threads */
732 /* :-[ no local threads => look out for local sparks */
733 /* the spark pool for the current PE */
734 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
735 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
736 pool->hd < pool->tl) {
738 * ToDo: add GC code check that we really have enough heap afterwards!!
740 * If we're here (no runnable threads) and we have pending
741 * sparks, we must have a space problem. Get enough space
742 * to turn one of those pending sparks into a
746 spark = findSpark(); /* get a spark */
747 if (spark != (rtsSpark) NULL) {
748 tso = activateSpark(spark); /* turn the spark into a thread */
749 IF_PAR_DEBUG(schedule,
750 belch("==== schedule: Created TSO %d (%p); %d threads active",
751 tso->id, tso, advisory_thread_count));
753 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
754 belch("==^^ failed to activate spark");
756 } /* otherwise fall through & pick-up new tso */
758 IF_PAR_DEBUG(verbose,
759 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
760 spark_queue_len(pool)));
764 /* =8-[ no local sparks => look for work on other PEs */
767 * We really have absolutely no work. Send out a fish
768 * (there may be some out there already), and wait for
769 * something to arrive. We clearly can't run any threads
770 * until a SCHEDULE or RESUME arrives, and so that's what
771 * we're hoping to see. (Of course, we still have to
772 * respond to other types of messages.)
775 outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
776 // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
777 /* fishing set in sendFish, processFish;
778 avoid flooding system with fishes via delay */
780 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
788 } else if (PacketsWaiting()) { /* Look for incoming messages */
792 /* Now we are sure that we have some work available */
793 ASSERT(run_queue_hd != END_TSO_QUEUE);
794 /* Take a thread from the run queue, if we have work */
795 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
797 /* ToDo: write something to the log-file
798 if (RTSflags.ParFlags.granSimStats && !sameThread)
799 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
803 /* the spark pool for the current PE */
804 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
806 IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)",
807 spark_queue_len(pool),
809 pool->hd, pool->tl, pool->base, pool->lim));
811 IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)",
812 run_queue_len(), CURRENT_PROC,
813 run_queue_hd, run_queue_tl));
818 we are running a different TSO, so write a schedule event to log file
819 NB: If we use fair scheduling we also have to write a deschedule
820 event for LastTSO; with unfair scheduling we know that the
821 previous tso has blocked whenever we switch to another tso, so
822 we don't need it in GUM for now
824 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
825 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
829 #else /* !GRAN && !PAR */
831 /* grab a thread from the run queue
833 ASSERT(run_queue_hd != END_TSO_QUEUE);
835 IF_DEBUG(sanity,checkTSO(t));
842 cap = free_capabilities;
843 free_capabilities = cap->link;
844 n_free_capabilities--;
849 cap->rCurrentTSO = t;
851 /* context switches are now initiated by the timer signal, unless
852 * the user specified "context switch as often as possible", with
855 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
856 && (run_queue_hd != END_TSO_QUEUE
857 || blocked_queue_hd != END_TSO_QUEUE
858 || sleeping_queue != END_TSO_QUEUE))
863 RELEASE_LOCK(&sched_mutex);
865 IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
866 t->id, t, whatNext_strs[t->what_next]));
868 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
869 /* Run the current thread
871 switch (cap->rCurrentTSO->what_next) {
874 /* Thread already finished, return to scheduler. */
875 ret = ThreadFinished;
878 ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
881 ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
883 case ThreadEnterHugs:
887 IF_DEBUG(scheduler,sched_belch("entering Hugs"));
888 c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
889 cap->rCurrentTSO->sp += 1;
894 barf("Panic: entered a BCO but no bytecode interpreter in this build");
897 barf("schedule: invalid what_next field");
899 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
901 /* Costs for the scheduler are assigned to CCS_SYSTEM */
906 ACQUIRE_LOCK(&sched_mutex);
909 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
910 #elif !defined(GRAN) && !defined(PAR)
911 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
913 t = cap->rCurrentTSO;
916 /* HACK 675: if the last thread didn't yield, make sure to print a
917 SCHEDULE event to the log file when StgRunning the next thread, even
918 if it is the same one as before */
919 LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t;
920 TimeOfLastYield = CURRENT_TIME;
925 /* make all the running tasks block on a condition variable,
926 * maybe set context_switch and wait till they all pile in,
927 * then have them wait on a GC condition variable.
929 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
930 t->id, t, whatNext_strs[t->what_next]));
933 ASSERT(!is_on_queue(t,CurrentProc));
936 ready_to_gc = rtsTrue;
937 context_switch = 1; /* stop other threads ASAP */
938 PUSH_ON_RUN_QUEUE(t);
939 /* actual GC is done at the end of the while loop */
943 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
944 t->id, t, whatNext_strs[t->what_next]));
945 /* just adjust the stack for this thread, then pop it back
951 /* enlarge the stack */
952 StgTSO *new_t = threadStackOverflow(t);
954 /* This TSO has moved, so update any pointers to it from the
955 * main thread stack. It better not be on any other queues...
958 for (m = main_threads; m != NULL; m = m->link) {
964 PUSH_ON_RUN_QUEUE(new_t);
971 DumpGranEvent(GR_DESCHEDULE, t));
972 globalGranStats.tot_yields++;
975 DumpGranEvent(GR_DESCHEDULE, t));
977 /* put the thread back on the run queue. Then, if we're ready to
978 * GC, check whether this is the last task to stop. If so, wake
979 * up the GC thread. getThread will block during a GC until the
983 if (t->what_next == ThreadEnterHugs) {
984 /* ToDo: or maybe a timer expired when we were in Hugs?
985 * or maybe someone hit ctrl-C
987 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
988 t->id, t, whatNext_strs[t->what_next]);
990 belch("--<< thread %ld (%p; %s) stopped, yielding",
991 t->id, t, whatNext_strs[t->what_next]);
998 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1000 ASSERT(t->link == END_TSO_QUEUE);
1002 ASSERT(!is_on_queue(t,CurrentProc));
1005 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1006 checkThreadQsSanity(rtsTrue));
1008 APPEND_TO_RUN_QUEUE(t);
1010 /* add a ContinueThread event to actually process the thread */
1011 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1013 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1015 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1024 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1025 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1026 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1028 // ??? needed; should emit block before
1030 DumpGranEvent(GR_DESCHEDULE, t));
1031 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1034 ASSERT(procStatus[CurrentProc]==Busy ||
1035 ((procStatus[CurrentProc]==Fetching) &&
1036 (t->block_info.closure!=(StgClosure*)NULL)));
1037 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1038 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1039 procStatus[CurrentProc]==Fetching))
1040 procStatus[CurrentProc] = Idle;
1044 DumpGranEvent(GR_DESCHEDULE, t));
1046 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1050 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1051 t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1052 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1055 /* don't need to do anything. Either the thread is blocked on
1056 * I/O, in which case we'll have called addToBlockedQueue
1057 * previously, or it's blocked on an MVar or Blackhole, in which
1058 * case it'll be on the relevant queue already.
1061 fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1062 printThreadBlockage(t);
1063 fprintf(stderr, "\n"));
1065 /* Only for dumping event to log file
1066 ToDo: do I need this in GranSim, too?
1073 case ThreadFinished:
1074 /* Need to check whether this was a main thread, and if so, signal
1075 * the task that started it with the return value. If we have no
1076 * more main threads, we probably need to stop all the tasks until
1079 /* We also end up here if the thread kills itself with an
1080 * uncaught exception, see Exception.hc.
1082 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1084 endThread(t, CurrentProc); // clean-up the thread
1086 advisory_thread_count--;
1087 if (RtsFlags.ParFlags.ParStats.Full)
1088 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1093 barf("schedule: invalid thread return code %d", (int)ret);
1097 cap->link = free_capabilities;
1098 free_capabilities = cap;
1099 n_free_capabilities++;
1103 if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
1108 /* everybody back, start the GC.
1109 * Could do it in this thread, or signal a condition var
1110 * to do it in another thread. Either way, we need to
1111 * broadcast on gc_pending_cond afterward.
1114 IF_DEBUG(scheduler,sched_belch("doing GC"));
1116 GarbageCollect(GetRoots,rtsFalse);
1117 ready_to_gc = rtsFalse;
1119 pthread_cond_broadcast(&gc_pending_cond);
1122 /* add a ContinueThread event to continue execution of current thread */
1123 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1125 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1127 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1134 IF_GRAN_DEBUG(unused,
1135 print_eventq(EventHd));
1137 event = get_next_event();
1141 /* ToDo: wait for next message to arrive rather than busy wait */
1146 t = take_off_run_queue(END_TSO_QUEUE);
1149 } /* end of while(1) */
1152 /* ---------------------------------------------------------------------------
1153 * deleteAllThreads(): kill all the live threads.
1155 * This is used when we catch a user interrupt (^C), before performing
1156 * any necessary cleanups and running finalizers.
1157 * ------------------------------------------------------------------------- */
1159 void deleteAllThreads ( void )
1162 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1163 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1166 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1169 for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1172 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1173 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1174 sleeping_queue = END_TSO_QUEUE;
1177 /* startThread and insertThread are now in GranSim.c -- HWL */
1179 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1180 //@subsection Suspend and Resume
1182 /* ---------------------------------------------------------------------------
1183 * Suspending & resuming Haskell threads.
1185 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1186 * its capability before calling the C function. This allows another
1187 * task to pick up the capability and carry on running Haskell
1188 * threads. It also means that if the C call blocks, it won't lock
1191 * The Haskell thread making the C call is put to sleep for the
1192 * duration of the call, on the susepended_ccalling_threads queue. We
1193 * give out a token to the task, which it can use to resume the thread
1194 * on return from the C function.
1195 * ------------------------------------------------------------------------- */
1198 suspendThread( Capability *cap )
1202 ACQUIRE_LOCK(&sched_mutex);
1205 sched_belch("thread %d did a _ccall_gc", cap->rCurrentTSO->id));
1207 threadPaused(cap->rCurrentTSO);
1208 cap->rCurrentTSO->link = suspended_ccalling_threads;
1209 suspended_ccalling_threads = cap->rCurrentTSO;
1211 /* Use the thread ID as the token; it should be unique */
1212 tok = cap->rCurrentTSO->id;
1215 cap->link = free_capabilities;
1216 free_capabilities = cap;
1217 n_free_capabilities++;
1220 RELEASE_LOCK(&sched_mutex);
1225 resumeThread( StgInt tok )
1227 StgTSO *tso, **prev;
1230 ACQUIRE_LOCK(&sched_mutex);
1232 prev = &suspended_ccalling_threads;
1233 for (tso = suspended_ccalling_threads;
1234 tso != END_TSO_QUEUE;
1235 prev = &tso->link, tso = tso->link) {
1236 if (tso->id == (StgThreadID)tok) {
1241 if (tso == END_TSO_QUEUE) {
1242 barf("resumeThread: thread not found");
1246 while (free_capabilities == NULL) {
1247 IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1248 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1249 IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1251 cap = free_capabilities;
1252 free_capabilities = cap->link;
1253 n_free_capabilities--;
1255 cap = &MainRegTable;
1258 cap->rCurrentTSO = tso;
1260 RELEASE_LOCK(&sched_mutex);
1265 /* ---------------------------------------------------------------------------
1267 * ------------------------------------------------------------------------ */
1268 static void unblockThread(StgTSO *tso);
1270 /* ---------------------------------------------------------------------------
1271 * Comparing Thread ids.
1273 * This is used from STG land in the implementation of the
1274 * instances of Eq/Ord for ThreadIds.
1275 * ------------------------------------------------------------------------ */
1277 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1279 StgThreadID id1 = tso1->id;
1280 StgThreadID id2 = tso2->id;
1282 if (id1 < id2) return (-1);
1283 if (id1 > id2) return 1;
1287 /* ---------------------------------------------------------------------------
1288 Create a new thread.
1290 The new thread starts with the given stack size. Before the
1291 scheduler can run, however, this thread needs to have a closure
1292 (and possibly some arguments) pushed on its stack. See
1293 pushClosure() in Schedule.h.
1295 createGenThread() and createIOThread() (in SchedAPI.h) are
1296 convenient packaged versions of this function.
1298 currently pri (priority) is only used in a GRAN setup -- HWL
1299 ------------------------------------------------------------------------ */
1300 //@cindex createThread
1302 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1304 createThread(nat stack_size, StgInt pri)
1306 return createThread_(stack_size, rtsFalse, pri);
1310 createThread_(nat size, rtsBool have_lock, StgInt pri)
1314 createThread(nat stack_size)
1316 return createThread_(stack_size, rtsFalse);
1320 createThread_(nat size, rtsBool have_lock)
1327 /* First check whether we should create a thread at all */
1329 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1330 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1332 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1333 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1334 return END_TSO_QUEUE;
1340 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1343 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1345 /* catch ridiculously small stack sizes */
1346 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1347 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1350 stack_size = size - TSO_STRUCT_SIZEW;
1352 tso = (StgTSO *)allocate(size);
1353 TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1355 SET_HDR(tso, &TSO_info, CCS_SYSTEM);
1357 SET_GRAN_HDR(tso, ThisPE);
1359 tso->what_next = ThreadEnterGHC;
1361 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1362 * protect the increment operation on next_thread_id.
1363 * In future, we could use an atomic increment instead.
1365 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1366 tso->id = next_thread_id++;
1367 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1369 tso->why_blocked = NotBlocked;
1370 tso->blocked_exceptions = NULL;
1372 tso->stack_size = stack_size;
1373 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1375 tso->sp = (P_)&(tso->stack) + stack_size;
1378 tso->prof.CCCS = CCS_MAIN;
1381 /* put a stop frame on the stack */
1382 tso->sp -= sizeofW(StgStopFrame);
1383 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1384 tso->su = (StgUpdateFrame*)tso->sp;
1388 tso->link = END_TSO_QUEUE;
1389 /* uses more flexible routine in GranSim */
1390 insertThread(tso, CurrentProc);
1392 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1397 #if defined(GRAN) || defined(PAR)
1398 DumpGranEvent(GR_START,tso);
1401 /* Link the new thread on the global thread list.
1403 tso->global_link = all_threads;
1407 tso->gran.pri = pri;
1409 tso->gran.magic = TSO_MAGIC; // debugging only
1411 tso->gran.sparkname = 0;
1412 tso->gran.startedat = CURRENT_TIME;
1413 tso->gran.exported = 0;
1414 tso->gran.basicblocks = 0;
1415 tso->gran.allocs = 0;
1416 tso->gran.exectime = 0;
1417 tso->gran.fetchtime = 0;
1418 tso->gran.fetchcount = 0;
1419 tso->gran.blocktime = 0;
1420 tso->gran.blockcount = 0;
1421 tso->gran.blockedat = 0;
1422 tso->gran.globalsparks = 0;
1423 tso->gran.localsparks = 0;
1424 if (RtsFlags.GranFlags.Light)
1425 tso->gran.clock = Now; /* local clock */
1427 tso->gran.clock = 0;
1429 IF_DEBUG(gran,printTSO(tso));
1432 tso->par.magic = TSO_MAGIC; // debugging only
1434 tso->par.sparkname = 0;
1435 tso->par.startedat = CURRENT_TIME;
1436 tso->par.exported = 0;
1437 tso->par.basicblocks = 0;
1438 tso->par.allocs = 0;
1439 tso->par.exectime = 0;
1440 tso->par.fetchtime = 0;
1441 tso->par.fetchcount = 0;
1442 tso->par.blocktime = 0;
1443 tso->par.blockcount = 0;
1444 tso->par.blockedat = 0;
1445 tso->par.globalsparks = 0;
1446 tso->par.localsparks = 0;
1450 globalGranStats.tot_threads_created++;
1451 globalGranStats.threads_created_on_PE[CurrentProc]++;
1452 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1453 globalGranStats.tot_sq_probes++;
1458 belch("==__ schedule: Created TSO %d (%p);",
1459 CurrentProc, tso, tso->id));
1461 IF_PAR_DEBUG(verbose,
1462 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1463 tso->id, tso, advisory_thread_count));
1465 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1466 tso->id, tso->stack_size));
1472 Turn a spark into a thread.
1473 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1476 //@cindex activateSpark
1478 activateSpark (rtsSpark spark)
1482 ASSERT(spark != (rtsSpark)NULL);
1483 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1484 if (tso!=END_TSO_QUEUE) {
1485 pushClosure(tso,spark);
1486 PUSH_ON_RUN_QUEUE(tso);
1487 advisory_thread_count++;
1489 if (RtsFlags.ParFlags.ParStats.Full) {
1490 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1491 IF_PAR_DEBUG(verbose,
1492 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1493 (StgClosure *)spark, info_type((StgClosure *)spark)));
1496 barf("activateSpark: Cannot create TSO");
1498 // ToDo: fwd info on local/global spark to thread -- HWL
1499 // tso->gran.exported = spark->exported;
1500 // tso->gran.locked = !spark->global;
1501 // tso->gran.sparkname = spark->name;
1507 /* ---------------------------------------------------------------------------
1510 * scheduleThread puts a thread on the head of the runnable queue.
1511 * This will usually be done immediately after a thread is created.
1512 * The caller of scheduleThread must create the thread using e.g.
1513 * createThread and push an appropriate closure
1514 * on this thread's stack before the scheduler is invoked.
1515 * ------------------------------------------------------------------------ */
1518 scheduleThread(StgTSO *tso)
1520 if (tso==END_TSO_QUEUE){
1525 ACQUIRE_LOCK(&sched_mutex);
1527 /* Put the new thread on the head of the runnable queue. The caller
1528 * better push an appropriate closure on this thread's stack
1529 * beforehand. In the SMP case, the thread may start running as
1530 * soon as we release the scheduler lock below.
1532 PUSH_ON_RUN_QUEUE(tso);
1536 IF_DEBUG(scheduler,printTSO(tso));
1538 RELEASE_LOCK(&sched_mutex);
1541 /* ---------------------------------------------------------------------------
1544 * Start up Posix threads to run each of the scheduler tasks.
1545 * I believe the task ids are not needed in the system as defined.
1547 * ------------------------------------------------------------------------ */
1549 #if defined(PAR) || defined(SMP)
1551 taskStart( void *arg STG_UNUSED )
1553 rts_evalNothing(NULL);
1557 /* ---------------------------------------------------------------------------
1560 * Initialise the scheduler. This resets all the queues - if the
1561 * queues contained any threads, they'll be garbage collected at the
1564 * This now calls startTasks(), so should only be called once! KH @ 25/10/99
1565 * ------------------------------------------------------------------------ */
1569 term_handler(int sig STG_UNUSED)
1572 ACQUIRE_LOCK(&term_mutex);
1574 RELEASE_LOCK(&term_mutex);
1579 //@cindex initScheduler
1586 for (i=0; i<=MAX_PROC; i++) {
1587 run_queue_hds[i] = END_TSO_QUEUE;
1588 run_queue_tls[i] = END_TSO_QUEUE;
1589 blocked_queue_hds[i] = END_TSO_QUEUE;
1590 blocked_queue_tls[i] = END_TSO_QUEUE;
1591 ccalling_threadss[i] = END_TSO_QUEUE;
1592 sleeping_queue = END_TSO_QUEUE;
1595 run_queue_hd = END_TSO_QUEUE;
1596 run_queue_tl = END_TSO_QUEUE;
1597 blocked_queue_hd = END_TSO_QUEUE;
1598 blocked_queue_tl = END_TSO_QUEUE;
1599 sleeping_queue = END_TSO_QUEUE;
1602 suspended_ccalling_threads = END_TSO_QUEUE;
1604 main_threads = NULL;
1605 all_threads = END_TSO_QUEUE;
1610 RtsFlags.ConcFlags.ctxtSwitchTicks =
1611 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1614 ecafList = END_ECAF_LIST;
1618 /* Install the SIGHUP handler */
1621 struct sigaction action,oact;
1623 action.sa_handler = term_handler;
1624 sigemptyset(&action.sa_mask);
1625 action.sa_flags = 0;
1626 if (sigaction(SIGTERM, &action, &oact) != 0) {
1627 barf("can't install TERM handler");
1633 /* Allocate N Capabilities */
1636 Capability *cap, *prev;
1639 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1640 cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1644 free_capabilities = cap;
1645 n_free_capabilities = RtsFlags.ParFlags.nNodes;
1647 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1648 n_free_capabilities););
1651 #if defined(SMP) || defined(PAR)
1664 /* make some space for saving all the thread ids */
1665 task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1666 "initScheduler:task_ids");
1668 /* and create all the threads */
1669 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1670 r = pthread_create(&tid,NULL,taskStart,NULL);
1672 barf("startTasks: Can't create new Posix thread");
1674 task_ids[i].id = tid;
1675 task_ids[i].mut_time = 0.0;
1676 task_ids[i].mut_etime = 0.0;
1677 task_ids[i].gc_time = 0.0;
1678 task_ids[i].gc_etime = 0.0;
1679 task_ids[i].elapsedtimestart = elapsedtime();
1680 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1686 exitScheduler( void )
1691 /* Don't want to use pthread_cancel, since we'd have to install
1692 * these silly exception handlers (pthread_cleanup_{push,pop}) around
1696 /* Cancel all our tasks */
1697 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1698 pthread_cancel(task_ids[i].id);
1701 /* Wait for all the tasks to terminate */
1702 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1703 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n",
1705 pthread_join(task_ids[i].id, NULL);
1709 /* Send 'em all a SIGHUP. That should shut 'em up.
1711 await_death = RtsFlags.ParFlags.nNodes;
1712 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1713 pthread_kill(task_ids[i].id,SIGTERM);
1715 while (await_death > 0) {
1721 /* -----------------------------------------------------------------------------
1722 Managing the per-task allocation areas.
1724 Each capability comes with an allocation area. These are
1725 fixed-length block lists into which allocation can be done.
1727 ToDo: no support for two-space collection at the moment???
1728 -------------------------------------------------------------------------- */
1730 /* -----------------------------------------------------------------------------
1731 * waitThread is the external interface for running a new computation
1732 * and waiting for the result.
1734 * In the non-SMP case, we create a new main thread, push it on the
1735 * main-thread stack, and invoke the scheduler to run it. The
1736 * scheduler will return when the top main thread on the stack has
1737 * completed or died, and fill in the necessary fields of the
1738 * main_thread structure.
1740 * In the SMP case, we create a main thread as before, but we then
1741 * create a new condition variable and sleep on it. When our new
1742 * main thread has completed, we'll be woken up and the status/result
1743 * will be in the main_thread struct.
1744 * -------------------------------------------------------------------------- */
1747 howManyThreadsAvail ( void )
1751 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1753 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1755 for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1761 finishAllThreads ( void )
1764 while (run_queue_hd != END_TSO_QUEUE) {
1765 waitThread ( run_queue_hd, NULL );
1767 while (blocked_queue_hd != END_TSO_QUEUE) {
1768 waitThread ( blocked_queue_hd, NULL );
1770 while (sleeping_queue != END_TSO_QUEUE) {
1771 waitThread ( blocked_queue_hd, NULL );
1774 (blocked_queue_hd != END_TSO_QUEUE ||
1775 run_queue_hd != END_TSO_QUEUE ||
1776 sleeping_queue != END_TSO_QUEUE);
1780 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1783 SchedulerStatus stat;
1785 ACQUIRE_LOCK(&sched_mutex);
1787 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1793 pthread_cond_init(&m->wakeup, NULL);
1796 m->link = main_threads;
1799 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n",
1804 pthread_cond_wait(&m->wakeup, &sched_mutex);
1805 } while (m->stat == NoStatus);
1807 /* GranSim specific init */
1808 CurrentTSO = m->tso; // the TSO to run
1809 procStatus[MainProc] = Busy; // status of main PE
1810 CurrentProc = MainProc; // PE to run it on
1815 ASSERT(m->stat != NoStatus);
1821 pthread_cond_destroy(&m->wakeup);
1824 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n",
1828 RELEASE_LOCK(&sched_mutex);
1833 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1834 //@subsection Run queue code
1838 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1839 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1840 implicit global variable that has to be correct when calling these
1844 /* Put the new thread on the head of the runnable queue.
1845 * The caller of createThread better push an appropriate closure
1846 * on this thread's stack before the scheduler is invoked.
1848 static /* inline */ void
1849 add_to_run_queue(tso)
1852 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1853 tso->link = run_queue_hd;
1855 if (run_queue_tl == END_TSO_QUEUE) {
1860 /* Put the new thread at the end of the runnable queue. */
1861 static /* inline */ void
1862 push_on_run_queue(tso)
1865 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1866 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1867 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1868 if (run_queue_hd == END_TSO_QUEUE) {
1871 run_queue_tl->link = tso;
1877 Should be inlined because it's used very often in schedule. The tso
1878 argument is actually only needed in GranSim, where we want to have the
1879 possibility to schedule *any* TSO on the run queue, irrespective of the
1880 actual ordering. Therefore, if tso is not the nil TSO then we traverse
1881 the run queue and dequeue the tso, adjusting the links in the queue.
1883 //@cindex take_off_run_queue
1884 static /* inline */ StgTSO*
1885 take_off_run_queue(StgTSO *tso) {
1889 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1891 if tso is specified, unlink that tso from the run_queue (doesn't have
1892 to be at the beginning of the queue); GranSim only
1894 if (tso!=END_TSO_QUEUE) {
1895 /* find tso in queue */
1896 for (t=run_queue_hd, prev=END_TSO_QUEUE;
1897 t!=END_TSO_QUEUE && t!=tso;
1901 /* now actually dequeue the tso */
1902 if (prev!=END_TSO_QUEUE) {
1903 ASSERT(run_queue_hd!=t);
1904 prev->link = t->link;
1906 /* t is at beginning of thread queue */
1907 ASSERT(run_queue_hd==t);
1908 run_queue_hd = t->link;
1910 /* t is at end of thread queue */
1911 if (t->link==END_TSO_QUEUE) {
1912 ASSERT(t==run_queue_tl);
1913 run_queue_tl = prev;
1915 ASSERT(run_queue_tl!=t);
1917 t->link = END_TSO_QUEUE;
1919 /* take tso from the beginning of the queue; std concurrent code */
1921 if (t != END_TSO_QUEUE) {
1922 run_queue_hd = t->link;
1923 t->link = END_TSO_QUEUE;
1924 if (run_queue_hd == END_TSO_QUEUE) {
1925 run_queue_tl = END_TSO_QUEUE;
1934 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1935 //@subsection Garbage Collextion Routines
1937 /* ---------------------------------------------------------------------------
1938 Where are the roots that we know about?
1940 - all the threads on the runnable queue
1941 - all the threads on the blocked queue
1942 - all the threads on the sleeping queue
1943 - all the thread currently executing a _ccall_GC
1944 - all the "main threads"
1946 ------------------------------------------------------------------------ */
1948 /* This has to be protected either by the scheduler monitor, or by the
1949 garbage collection monitor (probably the latter).
1953 static void GetRoots(void)
1960 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1961 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1962 run_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1963 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1964 run_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1966 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1967 blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1968 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1969 blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1970 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1971 ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1978 if (run_queue_hd != END_TSO_QUEUE) {
1979 ASSERT(run_queue_tl != END_TSO_QUEUE);
1980 run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1981 run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1984 if (blocked_queue_hd != END_TSO_QUEUE) {
1985 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1986 blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1987 blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1990 if (sleeping_queue != END_TSO_QUEUE) {
1991 sleeping_queue = (StgTSO *)MarkRoot((StgClosure *)sleeping_queue);
1995 for (m = main_threads; m != NULL; m = m->link) {
1996 m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1998 if (suspended_ccalling_threads != END_TSO_QUEUE)
1999 suspended_ccalling_threads =
2000 (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
2002 #if defined(SMP) || defined(PAR) || defined(GRAN)
2007 /* -----------------------------------------------------------------------------
2010 This is the interface to the garbage collector from Haskell land.
2011 We provide this so that external C code can allocate and garbage
2012 collect when called from Haskell via _ccall_GC.
2014 It might be useful to provide an interface whereby the programmer
2015 can specify more roots (ToDo).
2017 This needs to be protected by the GC condition variable above. KH.
2018 -------------------------------------------------------------------------- */
2020 void (*extra_roots)(void);
2025 GarbageCollect(GetRoots,rtsFalse);
2029 performMajorGC(void)
2031 GarbageCollect(GetRoots,rtsTrue);
2037 GetRoots(); /* the scheduler's roots */
2038 extra_roots(); /* the user's roots */
2042 performGCWithRoots(void (*get_roots)(void))
2044 extra_roots = get_roots;
2046 GarbageCollect(AllRoots,rtsFalse);
2049 /* -----------------------------------------------------------------------------
2052 If the thread has reached its maximum stack size, then raise the
2053 StackOverflow exception in the offending thread. Otherwise
2054 relocate the TSO into a larger chunk of memory and adjust its stack
2056 -------------------------------------------------------------------------- */
2059 threadStackOverflow(StgTSO *tso)
2061 nat new_stack_size, new_tso_size, diff, stack_words;
2065 IF_DEBUG(sanity,checkTSO(tso));
2066 if (tso->stack_size >= tso->max_stack_size) {
2069 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2070 tso->id, tso, tso->stack_size, tso->max_stack_size);
2071 /* If we're debugging, just print out the top of the stack */
2072 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2076 fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
2079 /* Send this thread the StackOverflow exception */
2080 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2085 /* Try to double the current stack size. If that takes us over the
2086 * maximum stack size for this thread, then use the maximum instead.
2087 * Finally round up so the TSO ends up as a whole number of blocks.
2089 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2090 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2091 TSO_STRUCT_SIZE)/sizeof(W_);
2092 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2093 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2095 IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2097 dest = (StgTSO *)allocate(new_tso_size);
2098 TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2100 /* copy the TSO block and the old stack into the new area */
2101 memcpy(dest,tso,TSO_STRUCT_SIZE);
2102 stack_words = tso->stack + tso->stack_size - tso->sp;
2103 new_sp = (P_)dest + new_tso_size - stack_words;
2104 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2106 /* relocate the stack pointers... */
2107 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2108 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2110 dest->stack_size = new_stack_size;
2112 /* and relocate the update frame list */
2113 relocate_TSO(tso, dest);
2115 /* Mark the old TSO as relocated. We have to check for relocated
2116 * TSOs in the garbage collector and any primops that deal with TSOs.
2118 * It's important to set the sp and su values to just beyond the end
2119 * of the stack, so we don't attempt to scavenge any part of the
2122 tso->what_next = ThreadRelocated;
2124 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2125 tso->su = (StgUpdateFrame *)tso->sp;
2126 tso->why_blocked = NotBlocked;
2127 dest->mut_link = NULL;
2129 IF_PAR_DEBUG(verbose,
2130 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2131 tso->id, tso, tso->stack_size);
2132 /* If we're debugging, just print out the top of the stack */
2133 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2136 IF_DEBUG(sanity,checkTSO(tso));
2138 IF_DEBUG(scheduler,printTSO(dest));
2144 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2145 //@subsection Blocking Queue Routines
2147 /* ---------------------------------------------------------------------------
2148 Wake up a queue that was blocked on some resource.
2149 ------------------------------------------------------------------------ */
2153 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2158 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2160 /* write RESUME events to log file and
2161 update blocked and fetch time (depending on type of the orig closure) */
2162 if (RtsFlags.ParFlags.ParStats.Full) {
2163 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2164 GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2165 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2167 switch (get_itbl(node)->type) {
2169 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2174 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2177 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2184 static StgBlockingQueueElement *
2185 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2188 PEs node_loc, tso_loc;
2190 node_loc = where_is(node); // should be lifted out of loop
2191 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2192 tso_loc = where_is((StgClosure *)tso);
2193 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2194 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2195 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2196 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2197 // insertThread(tso, node_loc);
2198 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2200 tso, node, (rtsSpark*)NULL);
2201 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2204 } else { // TSO is remote (actually should be FMBQ)
2205 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2206 RtsFlags.GranFlags.Costs.gunblocktime +
2207 RtsFlags.GranFlags.Costs.latency;
2208 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2210 tso, node, (rtsSpark*)NULL);
2211 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2214 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2216 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2217 (node_loc==tso_loc ? "Local" : "Global"),
2218 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2219 tso->block_info.closure = NULL;
2220 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2224 static StgBlockingQueueElement *
2225 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2227 StgBlockingQueueElement *next;
2229 switch (get_itbl(bqe)->type) {
2231 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2232 /* if it's a TSO just push it onto the run_queue */
2234 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2235 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2237 unblockCount(bqe, node);
2238 /* reset blocking status after dumping event */
2239 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2243 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2245 bqe->link = PendingFetches;
2246 PendingFetches = bqe;
2250 /* can ignore this case in a non-debugging setup;
2251 see comments on RBHSave closures above */
2253 /* check that the closure is an RBHSave closure */
2254 ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
2255 get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
2256 get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
2260 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2261 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2265 // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2269 #else /* !GRAN && !PAR */
2271 unblockOneLocked(StgTSO *tso)
2275 ASSERT(get_itbl(tso)->type == TSO);
2276 ASSERT(tso->why_blocked != NotBlocked);
2277 tso->why_blocked = NotBlocked;
2279 PUSH_ON_RUN_QUEUE(tso);
2281 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2286 #if defined(GRAN) || defined(PAR)
2287 inline StgBlockingQueueElement *
2288 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2290 ACQUIRE_LOCK(&sched_mutex);
2291 bqe = unblockOneLocked(bqe, node);
2292 RELEASE_LOCK(&sched_mutex);
2297 unblockOne(StgTSO *tso)
2299 ACQUIRE_LOCK(&sched_mutex);
2300 tso = unblockOneLocked(tso);
2301 RELEASE_LOCK(&sched_mutex);
2308 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2310 StgBlockingQueueElement *bqe;
2315 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2316 node, CurrentProc, CurrentTime[CurrentProc],
2317 CurrentTSO->id, CurrentTSO));
2319 node_loc = where_is(node);
2321 ASSERT(get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2322 get_itbl(q)->type == CONSTR); // closure (type constructor)
2323 ASSERT(is_unique(node));
2325 /* FAKE FETCH: magically copy the node to the tso's proc;
2326 no Fetch necessary because in reality the node should not have been
2327 moved to the other PE in the first place
2329 if (CurrentProc!=node_loc) {
2331 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2332 node, node_loc, CurrentProc, CurrentTSO->id,
2333 // CurrentTSO, where_is(CurrentTSO),
2334 node->header.gran.procs));
2335 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2337 belch("## new bitmask of node %p is %#x",
2338 node, node->header.gran.procs));
2339 if (RtsFlags.GranFlags.GranSimStats.Global) {
2340 globalGranStats.tot_fake_fetches++;
2345 // ToDo: check: ASSERT(CurrentProc==node_loc);
2346 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2349 bqe points to the current element in the queue
2350 next points to the next element in the queue
2352 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2353 //tso_loc = where_is(tso);
2355 bqe = unblockOneLocked(bqe, node);
2358 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2359 the closure to make room for the anchor of the BQ */
2360 if (bqe!=END_BQ_QUEUE) {
2361 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2363 ASSERT((info_ptr==&RBH_Save_0_info) ||
2364 (info_ptr==&RBH_Save_1_info) ||
2365 (info_ptr==&RBH_Save_2_info));
2367 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2368 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2369 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2372 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2373 node, info_type(node)));
2376 /* statistics gathering */
2377 if (RtsFlags.GranFlags.GranSimStats.Global) {
2378 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2379 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2380 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2381 globalGranStats.tot_awbq++; // total no. of bqs awakened
2384 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2385 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2389 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2391 StgBlockingQueueElement *bqe, *next;
2393 ACQUIRE_LOCK(&sched_mutex);
2395 IF_PAR_DEBUG(verbose,
2396 belch("## AwBQ for node %p on [%x]: ",
2399 ASSERT(get_itbl(q)->type == TSO ||
2400 get_itbl(q)->type == BLOCKED_FETCH ||
2401 get_itbl(q)->type == CONSTR);
2404 while (get_itbl(bqe)->type==TSO ||
2405 get_itbl(bqe)->type==BLOCKED_FETCH) {
2406 bqe = unblockOneLocked(bqe, node);
2408 RELEASE_LOCK(&sched_mutex);
2411 #else /* !GRAN && !PAR */
2413 awakenBlockedQueue(StgTSO *tso)
2415 ACQUIRE_LOCK(&sched_mutex);
2416 while (tso != END_TSO_QUEUE) {
2417 tso = unblockOneLocked(tso);
2419 RELEASE_LOCK(&sched_mutex);
2423 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2424 //@subsection Exception Handling Routines
2426 /* ---------------------------------------------------------------------------
2428 - usually called inside a signal handler so it mustn't do anything fancy.
2429 ------------------------------------------------------------------------ */
2432 interruptStgRts(void)
2438 /* -----------------------------------------------------------------------------
2441 This is for use when we raise an exception in another thread, which
2443 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2444 -------------------------------------------------------------------------- */
2446 #if defined(GRAN) || defined(PAR)
2448 NB: only the type of the blocking queue is different in GranSim and GUM
2449 the operations on the queue-elements are the same
2450 long live polymorphism!
2453 unblockThread(StgTSO *tso)
2455 StgBlockingQueueElement *t, **last;
2457 ACQUIRE_LOCK(&sched_mutex);
2458 switch (tso->why_blocked) {
2461 return; /* not blocked */
2464 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2466 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2467 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2469 last = (StgBlockingQueueElement **)&mvar->head;
2470 for (t = (StgBlockingQueueElement *)mvar->head;
2472 last = &t->link, last_tso = t, t = t->link) {
2473 if (t == (StgBlockingQueueElement *)tso) {
2474 *last = (StgBlockingQueueElement *)tso->link;
2475 if (mvar->tail == tso) {
2476 mvar->tail = (StgTSO *)last_tso;
2481 barf("unblockThread (MVAR): TSO not found");
2484 case BlockedOnBlackHole:
2485 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2487 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2489 last = &bq->blocking_queue;
2490 for (t = bq->blocking_queue;
2492 last = &t->link, t = t->link) {
2493 if (t == (StgBlockingQueueElement *)tso) {
2494 *last = (StgBlockingQueueElement *)tso->link;
2498 barf("unblockThread (BLACKHOLE): TSO not found");
2501 case BlockedOnException:
2503 StgTSO *target = tso->block_info.tso;
2505 ASSERT(get_itbl(target)->type == TSO);
2506 ASSERT(target->blocked_exceptions != NULL);
2508 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2509 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2511 last = &t->link, t = t->link) {
2512 ASSERT(get_itbl(t)->type == TSO);
2513 if (t == (StgBlockingQueueElement *)tso) {
2514 *last = (StgBlockingQueueElement *)tso->link;
2518 barf("unblockThread (Exception): TSO not found");
2522 case BlockedOnWrite:
2524 StgBlockingQueueElement *prev = NULL;
2525 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2526 prev = t, t = t->link) {
2527 if (t == (StgBlockingQueueElement *)tso) {
2529 blocked_queue_hd = (StgTSO *)t->link;
2530 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2531 blocked_queue_tl = END_TSO_QUEUE;
2534 prev->link = t->link;
2535 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2536 blocked_queue_tl = (StgTSO *)prev;
2542 barf("unblockThread (I/O): TSO not found");
2545 case BlockedOnDelay:
2547 StgBlockingQueueElement *prev = NULL;
2548 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2549 prev = t, t = t->link) {
2550 if (t == (StgBlockingQueueElement *)tso) {
2552 sleeping_queue = (StgTSO *)t->link;
2554 prev->link = t->link;
2559 barf("unblockThread (I/O): TSO not found");
2563 barf("unblockThread");
2567 tso->link = END_TSO_QUEUE;
2568 tso->why_blocked = NotBlocked;
2569 tso->block_info.closure = NULL;
2570 PUSH_ON_RUN_QUEUE(tso);
2571 RELEASE_LOCK(&sched_mutex);
2575 unblockThread(StgTSO *tso)
2579 ACQUIRE_LOCK(&sched_mutex);
2580 switch (tso->why_blocked) {
2583 return; /* not blocked */
2586 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2588 StgTSO *last_tso = END_TSO_QUEUE;
2589 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2592 for (t = mvar->head; t != END_TSO_QUEUE;
2593 last = &t->link, last_tso = t, t = t->link) {
2596 if (mvar->tail == tso) {
2597 mvar->tail = last_tso;
2602 barf("unblockThread (MVAR): TSO not found");
2605 case BlockedOnBlackHole:
2606 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2608 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2610 last = &bq->blocking_queue;
2611 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2612 last = &t->link, t = t->link) {
2618 barf("unblockThread (BLACKHOLE): TSO not found");
2621 case BlockedOnException:
2623 StgTSO *target = tso->block_info.tso;
2625 ASSERT(get_itbl(target)->type == TSO);
2626 ASSERT(target->blocked_exceptions != NULL);
2628 last = &target->blocked_exceptions;
2629 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2630 last = &t->link, t = t->link) {
2631 ASSERT(get_itbl(t)->type == TSO);
2637 barf("unblockThread (Exception): TSO not found");
2641 case BlockedOnWrite:
2643 StgTSO *prev = NULL;
2644 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2645 prev = t, t = t->link) {
2648 blocked_queue_hd = t->link;
2649 if (blocked_queue_tl == t) {
2650 blocked_queue_tl = END_TSO_QUEUE;
2653 prev->link = t->link;
2654 if (blocked_queue_tl == t) {
2655 blocked_queue_tl = prev;
2661 barf("unblockThread (I/O): TSO not found");
2664 case BlockedOnDelay:
2666 StgTSO *prev = NULL;
2667 for (t = sleeping_queue; t != END_TSO_QUEUE;
2668 prev = t, t = t->link) {
2671 sleeping_queue = t->link;
2673 prev->link = t->link;
2678 barf("unblockThread (I/O): TSO not found");
2682 barf("unblockThread");
2686 tso->link = END_TSO_QUEUE;
2687 tso->why_blocked = NotBlocked;
2688 tso->block_info.closure = NULL;
2689 PUSH_ON_RUN_QUEUE(tso);
2690 RELEASE_LOCK(&sched_mutex);
2694 /* -----------------------------------------------------------------------------
2697 * The following function implements the magic for raising an
2698 * asynchronous exception in an existing thread.
2700 * We first remove the thread from any queue on which it might be
2701 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2703 * We strip the stack down to the innermost CATCH_FRAME, building
2704 * thunks in the heap for all the active computations, so they can
2705 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2706 * an application of the handler to the exception, and push it on
2707 * the top of the stack.
2709 * How exactly do we save all the active computations? We create an
2710 * AP_UPD for every UpdateFrame on the stack. Entering one of these
2711 * AP_UPDs pushes everything from the corresponding update frame
2712 * upwards onto the stack. (Actually, it pushes everything up to the
2713 * next update frame plus a pointer to the next AP_UPD object.
2714 * Entering the next AP_UPD object pushes more onto the stack until we
2715 * reach the last AP_UPD object - at which point the stack should look
2716 * exactly as it did when we killed the TSO and we can continue
2717 * execution by entering the closure on top of the stack.
2719 * We can also kill a thread entirely - this happens if either (a) the
2720 * exception passed to raiseAsync is NULL, or (b) there's no
2721 * CATCH_FRAME on the stack. In either case, we strip the entire
2722 * stack and replace the thread with a zombie.
2724 * -------------------------------------------------------------------------- */
2727 deleteThread(StgTSO *tso)
2729 raiseAsync(tso,NULL);
2733 raiseAsync(StgTSO *tso, StgClosure *exception)
2735 StgUpdateFrame* su = tso->su;
2736 StgPtr sp = tso->sp;
2738 /* Thread already dead? */
2739 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2743 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2745 /* Remove it from any blocking queues */
2748 /* The stack freezing code assumes there's a closure pointer on
2749 * the top of the stack. This isn't always the case with compiled
2750 * code, so we have to push a dummy closure on the top which just
2751 * returns to the next return address on the stack.
2753 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2754 *(--sp) = (W_)&dummy_ret_closure;
2758 int words = ((P_)su - (P_)sp) - 1;
2762 /* If we find a CATCH_FRAME, and we've got an exception to raise,
2763 * then build PAP(handler,exception,realworld#), and leave it on
2764 * top of the stack ready to enter.
2766 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2767 StgCatchFrame *cf = (StgCatchFrame *)su;
2768 /* we've got an exception to raise, so let's pass it to the
2769 * handler in this frame.
2771 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2772 TICK_ALLOC_UPD_PAP(3,0);
2773 SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2776 ap->fun = cf->handler; /* :: Exception -> IO a */
2777 ap->payload[0] = exception;
2778 ap->payload[1] = ARG_TAG(0); /* realworld token */
2780 /* throw away the stack from Sp up to and including the
2783 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
2786 /* Restore the blocked/unblocked state for asynchronous exceptions
2787 * at the CATCH_FRAME.
2789 * If exceptions were unblocked at the catch, arrange that they
2790 * are unblocked again after executing the handler by pushing an
2791 * unblockAsyncExceptions_ret stack frame.
2793 if (!cf->exceptions_blocked) {
2794 *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2797 /* Ensure that async exceptions are blocked when running the handler.
2799 if (tso->blocked_exceptions == NULL) {
2800 tso->blocked_exceptions = END_TSO_QUEUE;
2803 /* Put the newly-built PAP on top of the stack, ready to execute
2804 * when the thread restarts.
2808 tso->what_next = ThreadEnterGHC;
2809 IF_DEBUG(sanity, checkTSO(tso));
2813 /* First build an AP_UPD consisting of the stack chunk above the
2814 * current update frame, with the top word on the stack as the
2817 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2822 ap->fun = (StgClosure *)sp[0];
2824 for(i=0; i < (nat)words; ++i) {
2825 ap->payload[i] = (StgClosure *)*sp++;
2828 switch (get_itbl(su)->type) {
2832 SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */);
2833 TICK_ALLOC_UP_THK(words+1,0);
2836 fprintf(stderr, "scheduler: Updating ");
2837 printPtr((P_)su->updatee);
2838 fprintf(stderr, " with ");
2839 printObj((StgClosure *)ap);
2842 /* Replace the updatee with an indirection - happily
2843 * this will also wake up any threads currently
2844 * waiting on the result.
2846 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
2848 sp += sizeofW(StgUpdateFrame) -1;
2849 sp[0] = (W_)ap; /* push onto stack */
2855 StgCatchFrame *cf = (StgCatchFrame *)su;
2858 /* We want a PAP, not an AP_UPD. Fortunately, the
2859 * layout's the same.
2861 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2862 TICK_ALLOC_UPD_PAP(words+1,0);
2864 /* now build o = FUN(catch,ap,handler) */
2865 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2866 TICK_ALLOC_FUN(2,0);
2867 SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2868 o->payload[0] = (StgClosure *)ap;
2869 o->payload[1] = cf->handler;
2872 fprintf(stderr, "scheduler: Built ");
2873 printObj((StgClosure *)o);
2876 /* pop the old handler and put o on the stack */
2878 sp += sizeofW(StgCatchFrame) - 1;
2885 StgSeqFrame *sf = (StgSeqFrame *)su;
2888 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2889 TICK_ALLOC_UPD_PAP(words+1,0);
2891 /* now build o = FUN(seq,ap) */
2892 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2893 TICK_ALLOC_SE_THK(1,0);
2894 SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2895 o->payload[0] = (StgClosure *)ap;
2898 fprintf(stderr, "scheduler: Built ");
2899 printObj((StgClosure *)o);
2902 /* pop the old handler and put o on the stack */
2904 sp += sizeofW(StgSeqFrame) - 1;
2910 /* We've stripped the entire stack, the thread is now dead. */
2911 sp += sizeofW(StgStopFrame) - 1;
2912 sp[0] = (W_)exception; /* save the exception */
2913 tso->what_next = ThreadKilled;
2914 tso->su = (StgUpdateFrame *)(sp+1);
2925 /* -----------------------------------------------------------------------------
2926 resurrectThreads is called after garbage collection on the list of
2927 threads found to be garbage. Each of these threads will be woken
2928 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2929 on an MVar, or NonTermination if the thread was blocked on a Black
2931 -------------------------------------------------------------------------- */
2934 resurrectThreads( StgTSO *threads )
2938 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2939 next = tso->global_link;
2940 tso->global_link = all_threads;
2942 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2944 switch (tso->why_blocked) {
2946 case BlockedOnException:
2947 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2949 case BlockedOnBlackHole:
2950 raiseAsync(tso,(StgClosure *)NonTermination_closure);
2953 /* This might happen if the thread was blocked on a black hole
2954 * belonging to a thread that we've just woken up (raiseAsync
2955 * can wake up threads, remember...).
2959 barf("resurrectThreads: thread blocked in a strange way");
2964 /* -----------------------------------------------------------------------------
2965 * Blackhole detection: if we reach a deadlock, test whether any
2966 * threads are blocked on themselves. Any threads which are found to
2967 * be self-blocked get sent a NonTermination exception.
2969 * This is only done in a deadlock situation in order to avoid
2970 * performance overhead in the normal case.
2971 * -------------------------------------------------------------------------- */
2974 detectBlackHoles( void )
2976 StgTSO *t = all_threads;
2977 StgUpdateFrame *frame;
2978 StgClosure *blocked_on;
2980 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2982 if (t->why_blocked != BlockedOnBlackHole) {
2986 blocked_on = t->block_info.closure;
2988 for (frame = t->su; ; frame = frame->link) {
2989 switch (get_itbl(frame)->type) {
2992 if (frame->updatee == blocked_on) {
2993 /* We are blocking on one of our own computations, so
2994 * send this thread the NonTermination exception.
2997 sched_belch("thread %d is blocked on itself", t->id));
2998 raiseAsync(t, (StgClosure *)NonTermination_closure);
3019 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3020 //@subsection Debugging Routines
3022 /* -----------------------------------------------------------------------------
3023 Debugging: why is a thread blocked
3024 -------------------------------------------------------------------------- */
3029 printThreadBlockage(StgTSO *tso)
3031 switch (tso->why_blocked) {
3033 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3035 case BlockedOnWrite:
3036 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3038 case BlockedOnDelay:
3039 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3042 fprintf(stderr,"is blocked on an MVar");
3044 case BlockedOnException:
3045 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3046 tso->block_info.tso->id);
3048 case BlockedOnBlackHole:
3049 fprintf(stderr,"is blocked on a black hole");
3052 fprintf(stderr,"is not blocked");
3056 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3057 tso->block_info.closure, info_type(tso->block_info.closure));
3059 case BlockedOnGA_NoSend:
3060 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3061 tso->block_info.closure, info_type(tso->block_info.closure));
3065 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3066 tso->why_blocked, tso->id, tso);
3071 printThreadStatus(StgTSO *tso)
3073 switch (tso->what_next) {
3075 fprintf(stderr,"has been killed");
3077 case ThreadComplete:
3078 fprintf(stderr,"has completed");
3081 printThreadBlockage(tso);
3086 printAllThreads(void)
3090 sched_belch("all threads:");
3091 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3092 fprintf(stderr, "\tthread %d ", t->id);
3093 printThreadStatus(t);
3094 fprintf(stderr,"\n");
3099 Print a whole blocking queue attached to node (debugging only).
3104 print_bq (StgClosure *node)
3106 StgBlockingQueueElement *bqe;
3110 fprintf(stderr,"## BQ of closure %p (%s): ",
3111 node, info_type(node));
3113 /* should cover all closures that may have a blocking queue */
3114 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3115 get_itbl(node)->type == FETCH_ME_BQ ||
3116 get_itbl(node)->type == RBH);
3118 ASSERT(node!=(StgClosure*)NULL); // sanity check
3120 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3122 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3123 !end; // iterate until bqe points to a CONSTR
3124 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3125 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3126 ASSERT(bqe != (StgTSO*)NULL); // sanity check
3127 /* types of closures that may appear in a blocking queue */
3128 ASSERT(get_itbl(bqe)->type == TSO ||
3129 get_itbl(bqe)->type == BLOCKED_FETCH ||
3130 get_itbl(bqe)->type == CONSTR);
3131 /* only BQs of an RBH end with an RBH_Save closure */
3132 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3134 switch (get_itbl(bqe)->type) {
3136 fprintf(stderr," TSO %d (%x),",
3137 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3140 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3141 ((StgBlockedFetch *)bqe)->node,
3142 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3143 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3144 ((StgBlockedFetch *)bqe)->ga.weight);
3147 fprintf(stderr," %s (IP %p),",
3148 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3149 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3150 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3151 "RBH_Save_?"), get_itbl(bqe));
3154 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3155 info_type(bqe), node, info_type(node));
3159 fputc('\n', stderr);
3161 # elif defined(GRAN)
3163 print_bq (StgClosure *node)
3165 StgBlockingQueueElement *bqe;
3166 PEs node_loc, tso_loc;
3169 /* should cover all closures that may have a blocking queue */
3170 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3171 get_itbl(node)->type == FETCH_ME_BQ ||
3172 get_itbl(node)->type == RBH);
3174 ASSERT(node!=(StgClosure*)NULL); // sanity check
3175 node_loc = where_is(node);
3177 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3178 node, info_type(node), node_loc);
3181 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3183 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3184 !end; // iterate until bqe points to a CONSTR
3185 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3186 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3187 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3188 /* types of closures that may appear in a blocking queue */
3189 ASSERT(get_itbl(bqe)->type == TSO ||
3190 get_itbl(bqe)->type == CONSTR);
3191 /* only BQs of an RBH end with an RBH_Save closure */
3192 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3194 tso_loc = where_is((StgClosure *)bqe);
3195 switch (get_itbl(bqe)->type) {
3197 fprintf(stderr," TSO %d (%p) on [PE %d],",
3198 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3201 fprintf(stderr," %s (IP %p),",
3202 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3203 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3204 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3205 "RBH_Save_?"), get_itbl(bqe));
3208 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3209 info_type((StgClosure *)bqe), node, info_type(node));
3213 fputc('\n', stderr);
3217 Nice and easy: only TSOs on the blocking queue
3220 print_bq (StgClosure *node)
3224 ASSERT(node!=(StgClosure*)NULL); // sanity check
3225 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3226 tso != END_TSO_QUEUE;
3228 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3229 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3230 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3232 fputc('\n', stderr);
3243 for (i=0, tso=run_queue_hd;
3244 tso != END_TSO_QUEUE;
3253 sched_belch(char *s, ...)
3258 fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3260 fprintf(stderr, "scheduler: ");
3262 vfprintf(stderr, s, ap);
3263 fprintf(stderr, "\n");
3269 //@node Index, , Debugging Routines, Main scheduling code
3273 //* MainRegTable:: @cindex\s-+MainRegTable
3274 //* StgMainThread:: @cindex\s-+StgMainThread
3275 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3276 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3277 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3278 //* context_switch:: @cindex\s-+context_switch
3279 //* createThread:: @cindex\s-+createThread
3280 //* free_capabilities:: @cindex\s-+free_capabilities
3281 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3282 //* initScheduler:: @cindex\s-+initScheduler
3283 //* interrupted:: @cindex\s-+interrupted
3284 //* n_free_capabilities:: @cindex\s-+n_free_capabilities
3285 //* next_thread_id:: @cindex\s-+next_thread_id
3286 //* print_bq:: @cindex\s-+print_bq
3287 //* run_queue_hd:: @cindex\s-+run_queue_hd
3288 //* run_queue_tl:: @cindex\s-+run_queue_tl
3289 //* sched_mutex:: @cindex\s-+sched_mutex
3290 //* schedule:: @cindex\s-+schedule
3291 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3292 //* task_ids:: @cindex\s-+task_ids
3293 //* term_mutex:: @cindex\s-+term_mutex
3294 //* thread_ready_cond:: @cindex\s-+thread_ready_cond