1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.62 2000/04/03 15:52:53 simonmar Exp $
4 * (c) The GHC Team, 1998-2000
8 * The main scheduling code in GranSim is quite different from that in std
9 * (concurrent) Haskell: while concurrent Haskell just iterates over the
10 * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11 * over the events in the global event queue. -- HWL
12 * --------------------------------------------------------------------------*/
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
17 /* Version with scheduler monitor support for SMPs.
19 This design provides a high-level API to create and schedule threads etc.
20 as documented in the SMP design document.
22 It uses a monitor design controlled by a single mutex to exercise control
23 over accesses to shared data structures, and builds on the Posix threads
26 The majority of state is shared. In order to keep essential per-task state,
27 there is a Capability structure, which contains all the information
28 needed to run a thread: its STG registers, a pointer to its TSO, a
29 nursery etc. During STG execution, a pointer to the capability is
30 kept in a register (BaseReg).
32 In a non-SMP build, there is one global capability, namely MainRegTable.
39 //* Variables and Data structures::
40 //* Main scheduling loop::
41 //* Suspend and Resume::
43 //* Garbage Collextion Routines::
44 //* Blocking Queue Routines::
45 //* Exception Handling Routines::
46 //* Debugging Routines::
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
59 #include "StgStartup.h"
63 #include "StgMiscClosures.h"
65 #include "Evaluator.h"
66 #include "Exception.h"
74 #if defined(GRAN) || defined(PAR)
75 # include "GranSimRts.h"
77 # include "ParallelRts.h"
78 # include "Parallel.h"
79 # include "ParallelDebug.h"
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
92 * These are the threads which clients have requested that we run.
94 * In an SMP build, we might have several concurrent clients all
95 * waiting for results, and each one will wait on a condition variable
96 * until the result is available.
98 * In non-SMP, clients are strictly nested: the first client calls
99 * into the RTS, which might call out again to C with a _ccall_GC, and
100 * eventually re-enter the RTS.
102 * Main threads information is kept in a linked list:
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
107 SchedulerStatus stat;
110 pthread_cond_t wakeup;
112 struct StgMainThread_ *link;
115 /* Main thread queue.
116 * Locks required: sched_mutex.
118 static StgMainThread *main_threads;
121 * Locks required: sched_mutex.
125 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
126 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
129 In GranSim we have a runable and a blocked queue for each processor.
130 In order to minimise code changes new arrays run_queue_hds/tls
131 are created. run_queue_hd is then a short cut (macro) for
132 run_queue_hds[CurrentProc] (see GranSim.h).
135 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
136 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
137 StgTSO *ccalling_threadss[MAX_PROC];
138 /* We use the same global list of threads (all_threads) in GranSim as in
139 the std RTS (i.e. we are cheating). However, we don't use this list in
140 the GranSim specific code at the moment (so we are only potentially
145 StgTSO *run_queue_hd, *run_queue_tl;
146 StgTSO *blocked_queue_hd, *blocked_queue_tl;
150 /* Linked list of all threads.
151 * Used for detecting garbage collected threads.
155 /* Threads suspended in _ccall_GC.
157 static StgTSO *suspended_ccalling_threads;
159 static void GetRoots(void);
160 static StgTSO *threadStackOverflow(StgTSO *tso);
162 /* KH: The following two flags are shared memory locations. There is no need
163 to lock them, since they are only unset at the end of a scheduler
167 /* flag set by signal handler to precipitate a context switch */
168 //@cindex context_switch
171 /* if this flag is set as well, give up execution */
172 //@cindex interrupted
175 /* Next thread ID to allocate.
176 * Locks required: sched_mutex
178 //@cindex next_thread_id
179 StgThreadID next_thread_id = 1;
182 * Pointers to the state of the current thread.
183 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
184 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
187 /* The smallest stack size that makes any sense is:
188 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
189 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
190 * + 1 (the realworld token for an IO thread)
191 * + 1 (the closure to enter)
193 * A thread with this stack will bomb immediately with a stack
194 * overflow, which will increase its stack size.
197 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
199 /* Free capability list.
200 * Locks required: sched_mutex.
203 //@cindex free_capabilities
204 //@cindex n_free_capabilities
205 Capability *free_capabilities; /* Available capabilities for running threads */
206 nat n_free_capabilities; /* total number of available capabilities */
208 //@cindex MainRegTable
209 Capability MainRegTable; /* for non-SMP, we have one global capability */
218 /* All our current task ids, saved in case we need to kill them later.
225 void addToBlockedQueue ( StgTSO *tso );
227 static void schedule ( void );
228 void interruptStgRts ( void );
230 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
232 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
236 static void sched_belch(char *s, ...);
240 //@cindex sched_mutex
242 //@cindex thread_ready_cond
243 //@cindex gc_pending_cond
244 pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
245 pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
246 pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
247 pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
254 rtsTime TimeOfLastYield;
258 char *whatNext_strs[] = {
266 char *threadReturnCode_strs[] = {
267 "HeapOverflow", /* might also be StackOverflow */
276 * The thread state for the main thread.
277 // ToDo: check whether not needed any more
281 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
282 //@subsection Main scheduling loop
284 /* ---------------------------------------------------------------------------
285 Main scheduling loop.
287 We use round-robin scheduling, each thread returning to the
288 scheduler loop when one of these conditions is detected:
291 * timer expires (thread yields)
296 Locking notes: we acquire the scheduler lock once at the beginning
297 of the scheduler loop, and release it when
299 * running a thread, or
300 * waiting for work, or
301 * waiting for a GC to complete.
304 In a GranSim setup this loop iterates over the global event queue.
305 This revolves around the global event queue, which determines what
306 to do next. Therefore, it's more complicated than either the
307 concurrent or the parallel (GUM) setup.
310 GUM iterates over incoming messages.
311 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312 and sends out a fish whenever it has nothing to do; in-between
313 doing the actual reductions (shared code below) it processes the
314 incoming messages and deals with delayed operations
315 (see PendingFetches).
316 This is not the ugliest code you could imagine, but it's bloody close.
318 ------------------------------------------------------------------------ */
325 StgThreadReturnCode ret;
334 rtsBool was_interrupted = rtsFalse;
336 ACQUIRE_LOCK(&sched_mutex);
340 /* set up first event to get things going */
341 /* ToDo: assign costs for system setup and init MainTSO ! */
342 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
344 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
347 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
348 G_TSO(CurrentTSO, 5));
350 if (RtsFlags.GranFlags.Light) {
351 /* Save current time; GranSim Light only */
352 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
355 event = get_next_event();
357 while (event!=(rtsEvent*)NULL) {
358 /* Choose the processor with the next event */
359 CurrentProc = event->proc;
360 CurrentTSO = event->tso;
364 while (!GlobalStopPending) { /* GlobalStopPending set in par_exit */
372 IF_DEBUG(scheduler, printAllThreads());
374 /* If we're interrupted (the user pressed ^C, or some other
375 * termination condition occurred), kill all the currently running
379 IF_DEBUG(scheduler, sched_belch("interrupted"));
380 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
383 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
386 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
387 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
388 interrupted = rtsFalse;
389 was_interrupted = rtsTrue;
392 /* Go through the list of main threads and wake up any
393 * clients whose computations have finished. ToDo: this
394 * should be done more efficiently without a linear scan
395 * of the main threads list, somehow...
399 StgMainThread *m, **prev;
400 prev = &main_threads;
401 for (m = main_threads; m != NULL; m = m->link) {
402 switch (m->tso->what_next) {
405 *(m->ret) = (StgClosure *)m->tso->sp[0];
409 pthread_cond_broadcast(&m->wakeup);
413 if (was_interrupted) {
414 m->stat = Interrupted;
418 pthread_cond_broadcast(&m->wakeup);
428 /* in GUM do this only on the Main PE */
431 /* If our main thread has finished or been killed, return.
434 StgMainThread *m = main_threads;
435 if (m->tso->what_next == ThreadComplete
436 || m->tso->what_next == ThreadKilled) {
437 main_threads = main_threads->link;
438 if (m->tso->what_next == ThreadComplete) {
439 /* we finished successfully, fill in the return value */
440 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
444 if (was_interrupted) {
445 m->stat = Interrupted;
455 /* Top up the run queue from our spark pool. We try to make the
456 * number of threads in the run queue equal to the number of
461 nat n = n_free_capabilities;
462 StgTSO *tso = run_queue_hd;
464 /* Count the run queue */
465 while (n > 0 && tso != END_TSO_QUEUE) {
474 break; /* no more sparks in the pool */
476 /* I'd prefer this to be done in activateSpark -- HWL */
477 /* tricky - it needs to hold the scheduler lock and
478 * not try to re-acquire it -- SDM */
480 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
481 pushClosure(tso,spark);
482 PUSH_ON_RUN_QUEUE(tso);
484 advisory_thread_count++;
488 sched_belch("turning spark of closure %p into a thread",
489 (StgClosure *)spark));
492 /* We need to wake up the other tasks if we just created some
495 if (n_free_capabilities - n > 1) {
496 pthread_cond_signal(&thread_ready_cond);
501 /* Check whether any waiting threads need to be woken up. If the
502 * run queue is empty, and there are no other tasks running, we
503 * can wait indefinitely for something to happen.
504 * ToDo: what if another client comes along & requests another
507 if (blocked_queue_hd != END_TSO_QUEUE) {
509 (run_queue_hd == END_TSO_QUEUE)
511 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
516 /* check for signals each time around the scheduler */
518 if (signals_pending()) {
519 start_signal_handlers();
523 /* Detect deadlock: when we have no threads to run, there are
524 * no threads waiting on I/O or sleeping, and all the other
525 * tasks are waiting for work, we must have a deadlock. Inform
526 * all the main threads.
529 if (blocked_queue_hd == END_TSO_QUEUE
530 && run_queue_hd == END_TSO_QUEUE
531 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
534 for (m = main_threads; m != NULL; m = m->link) {
537 pthread_cond_broadcast(&m->wakeup);
543 In GUM all non-main PEs come in here with no work;
544 we ignore multiple main threads for now
546 if (blocked_queue_hd == END_TSO_QUEUE
547 && run_queue_hd == END_TSO_QUEUE) {
548 StgMainThread *m = main_threads;
551 main_threads = m->link;
558 /* If there's a GC pending, don't do anything until it has
562 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
563 pthread_cond_wait(&gc_pending_cond, &sched_mutex);
566 /* block until we've got a thread on the run queue and a free
569 while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
570 IF_DEBUG(scheduler, sched_belch("waiting for work"));
571 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
572 IF_DEBUG(scheduler, sched_belch("work now available"));
578 if (RtsFlags.GranFlags.Light)
579 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
581 /* adjust time based on time-stamp */
582 if (event->time > CurrentTime[CurrentProc] &&
583 event->evttype != ContinueThread)
584 CurrentTime[CurrentProc] = event->time;
586 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
587 if (!RtsFlags.GranFlags.Light)
590 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
592 /* main event dispatcher in GranSim */
593 switch (event->evttype) {
594 /* Should just be continuing execution */
596 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
597 /* ToDo: check assertion
598 ASSERT(run_queue_hd != (StgTSO*)NULL &&
599 run_queue_hd != END_TSO_QUEUE);
601 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
602 if (!RtsFlags.GranFlags.DoAsyncFetch &&
603 procStatus[CurrentProc]==Fetching) {
604 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
605 CurrentTSO->id, CurrentTSO, CurrentProc);
608 /* Ignore ContinueThreads for completed threads */
609 if (CurrentTSO->what_next == ThreadComplete) {
610 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
611 CurrentTSO->id, CurrentTSO, CurrentProc);
614 /* Ignore ContinueThreads for threads that are being migrated */
615 if (PROCS(CurrentTSO)==Nowhere) {
616 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
617 CurrentTSO->id, CurrentTSO, CurrentProc);
620 /* The thread should be at the beginning of the run queue */
621 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
622 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
623 CurrentTSO->id, CurrentTSO, CurrentProc);
624 break; // run the thread anyway
627 new_event(proc, proc, CurrentTime[proc],
629 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
631 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
632 break; // now actually run the thread; DaH Qu'vam yImuHbej
635 do_the_fetchnode(event);
636 goto next_thread; /* handle next event in event queue */
639 do_the_globalblock(event);
640 goto next_thread; /* handle next event in event queue */
643 do_the_fetchreply(event);
644 goto next_thread; /* handle next event in event queue */
646 case UnblockThread: /* Move from the blocked queue to the tail of */
647 do_the_unblock(event);
648 goto next_thread; /* handle next event in event queue */
650 case ResumeThread: /* Move from the blocked queue to the tail of */
651 /* the runnable queue ( i.e. Qu' SImqa'lu') */
652 event->tso->gran.blocktime +=
653 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
654 do_the_startthread(event);
655 goto next_thread; /* handle next event in event queue */
658 do_the_startthread(event);
659 goto next_thread; /* handle next event in event queue */
662 do_the_movethread(event);
663 goto next_thread; /* handle next event in event queue */
666 do_the_movespark(event);
667 goto next_thread; /* handle next event in event queue */
670 do_the_findwork(event);
671 goto next_thread; /* handle next event in event queue */
674 barf("Illegal event type %u\n", event->evttype);
677 /* This point was scheduler_loop in the old RTS */
679 IF_DEBUG(gran, belch("GRAN: after main switch"));
681 TimeOfLastEvent = CurrentTime[CurrentProc];
682 TimeOfNextEvent = get_time_of_next_event();
683 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
684 // CurrentTSO = ThreadQueueHd;
686 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
689 if (RtsFlags.GranFlags.Light)
690 GranSimLight_leave_system(event, &ActiveTSO);
692 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
695 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
697 /* in a GranSim setup the TSO stays on the run queue */
699 /* Take a thread from the run queue. */
700 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
703 fprintf(stderr, "GRAN: About to run current thread, which is\n");
706 context_switch = 0; // turned on via GranYield, checking events and time slice
709 DumpGranEvent(GR_SCHEDULE, t));
711 procStatus[CurrentProc] = Busy;
715 if (PendingFetches != END_BF_QUEUE) {
719 /* ToDo: phps merge with spark activation above */
720 /* check whether we have local work and send requests if we have none */
721 if (run_queue_hd == END_TSO_QUEUE) { /* no runnable threads */
722 /* :-[ no local threads => look out for local sparks */
723 /* the spark pool for the current PE */
724 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
725 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
726 pool->hd < pool->tl) {
728 * ToDo: add GC code check that we really have enough heap afterwards!!
730 * If we're here (no runnable threads) and we have pending
731 * sparks, we must have a space problem. Get enough space
732 * to turn one of those pending sparks into a
736 spark = findSpark(); /* get a spark */
737 if (spark != (rtsSpark) NULL) {
738 tso = activateSpark(spark); /* turn the spark into a thread */
739 IF_PAR_DEBUG(schedule,
740 belch("==== schedule: Created TSO %d (%p); %d threads active",
741 tso->id, tso, advisory_thread_count));
743 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
744 belch("==^^ failed to activate spark");
746 } /* otherwise fall through & pick-up new tso */
748 IF_PAR_DEBUG(verbose,
749 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
750 spark_queue_len(pool)));
754 /* =8-[ no local sparks => look for work on other PEs */
757 * We really have absolutely no work. Send out a fish
758 * (there may be some out there already), and wait for
759 * something to arrive. We clearly can't run any threads
760 * until a SCHEDULE or RESUME arrives, and so that's what
761 * we're hoping to see. (Of course, we still have to
762 * respond to other types of messages.)
765 outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
766 // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
767 /* fishing set in sendFish, processFish;
768 avoid flooding system with fishes via delay */
770 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
778 } else if (PacketsWaiting()) { /* Look for incoming messages */
782 /* Now we are sure that we have some work available */
783 ASSERT(run_queue_hd != END_TSO_QUEUE);
784 /* Take a thread from the run queue, if we have work */
785 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
787 /* ToDo: write something to the log-file
788 if (RTSflags.ParFlags.granSimStats && !sameThread)
789 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
793 /* the spark pool for the current PE */
794 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
796 IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)",
797 spark_queue_len(pool),
799 pool->hd, pool->tl, pool->base, pool->lim));
801 IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)",
802 run_queue_len(), CURRENT_PROC,
803 run_queue_hd, run_queue_tl));
808 we are running a different TSO, so write a schedule event to log file
809 NB: If we use fair scheduling we also have to write a deschedule
810 event for LastTSO; with unfair scheduling we know that the
811 previous tso has blocked whenever we switch to another tso, so
812 we don't need it in GUM for now
814 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
815 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
819 #else /* !GRAN && !PAR */
821 /* grab a thread from the run queue
824 IF_DEBUG(sanity,checkTSO(t));
831 cap = free_capabilities;
832 free_capabilities = cap->link;
833 n_free_capabilities--;
838 cap->rCurrentTSO = t;
840 /* set the context_switch flag
842 if (run_queue_hd == END_TSO_QUEUE)
847 RELEASE_LOCK(&sched_mutex);
849 #if defined(GRAN) || defined(PAR)
850 IF_DEBUG(scheduler, belch("-->> Running TSO %ld (%p) %s ...",
851 t->id, t, whatNext_strs[t->what_next]));
853 IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
856 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
857 /* Run the current thread
859 switch (cap->rCurrentTSO->what_next) {
862 /* Thread already finished, return to scheduler. */
863 ret = ThreadFinished;
866 ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
869 ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
871 case ThreadEnterHugs:
875 IF_DEBUG(scheduler,sched_belch("entering Hugs"));
876 c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
877 cap->rCurrentTSO->sp += 1;
882 barf("Panic: entered a BCO but no bytecode interpreter in this build");
885 barf("schedule: invalid what_next field");
887 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
889 /* Costs for the scheduler are assigned to CCS_SYSTEM */
894 ACQUIRE_LOCK(&sched_mutex);
897 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
898 #elif !defined(GRAN) && !defined(PAR)
899 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
901 t = cap->rCurrentTSO;
904 /* HACK 675: if the last thread didn't yield, make sure to print a
905 SCHEDULE event to the log file when StgRunning the next thread, even
906 if it is the same one as before */
907 LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t;
908 TimeOfLastYield = CURRENT_TIME;
913 /* make all the running tasks block on a condition variable,
914 * maybe set context_switch and wait till they all pile in,
915 * then have them wait on a GC condition variable.
917 #if defined(GRAN) || defined(PAR)
918 IF_DEBUG(scheduler,belch("--<< TSO %ld (%p; %s) stopped: HeapOverflow",
919 t->id, t, whatNext_strs[t->what_next]));
923 ASSERT(!is_on_queue(t,CurrentProc));
926 ready_to_gc = rtsTrue;
927 context_switch = 1; /* stop other threads ASAP */
928 PUSH_ON_RUN_QUEUE(t);
929 /* actual GC is done at the end of the while loop */
933 #if defined(GRAN) || defined(PAR)
934 IF_DEBUG(scheduler,belch("--<< TSO %ld (%p; %s) stopped, StackOverflow",
935 t->id, t, whatNext_strs[t->what_next]));
937 /* just adjust the stack for this thread, then pop it back
943 /* enlarge the stack */
944 StgTSO *new_t = threadStackOverflow(t);
946 /* This TSO has moved, so update any pointers to it from the
947 * main thread stack. It better not be on any other queues...
950 for (m = main_threads; m != NULL; m = m->link) {
956 PUSH_ON_RUN_QUEUE(new_t);
963 DumpGranEvent(GR_DESCHEDULE, t));
964 globalGranStats.tot_yields++;
967 DumpGranEvent(GR_DESCHEDULE, t));
969 /* put the thread back on the run queue. Then, if we're ready to
970 * GC, check whether this is the last task to stop. If so, wake
971 * up the GC thread. getThread will block during a GC until the
974 #if defined(GRAN) || defined(PAR)
976 if (t->what_next == ThreadEnterHugs) {
977 /* ToDo: or maybe a timer expired when we were in Hugs?
978 * or maybe someone hit ctrl-C
980 belch("--<< TSO %ld (%p; %s) stopped to switch to Hugs",
981 t->id, t, whatNext_strs[t->what_next]);
983 belch("--<< TSO %ld (%p; %s) stopped, yielding",
984 t->id, t, whatNext_strs[t->what_next]);
989 if (t->what_next == ThreadEnterHugs) {
990 /* ToDo: or maybe a timer expired when we were in Hugs?
991 * or maybe someone hit ctrl-C
993 belch("thread %ld stopped to switch to Hugs", t->id);
995 belch("thread %ld stopped, yielding", t->id);
1001 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1003 ASSERT(t->link == END_TSO_QUEUE);
1005 ASSERT(!is_on_queue(t,CurrentProc));
1008 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1009 checkThreadQsSanity(rtsTrue));
1011 APPEND_TO_RUN_QUEUE(t);
1013 /* add a ContinueThread event to actually process the thread */
1014 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1016 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1018 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1027 belch("--<< TSO %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1028 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1029 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1031 // ??? needed; should emit block before
1033 DumpGranEvent(GR_DESCHEDULE, t));
1034 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1037 ASSERT(procStatus[CurrentProc]==Busy ||
1038 ((procStatus[CurrentProc]==Fetching) &&
1039 (t->block_info.closure!=(StgClosure*)NULL)));
1040 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1041 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1042 procStatus[CurrentProc]==Fetching))
1043 procStatus[CurrentProc] = Idle;
1047 DumpGranEvent(GR_DESCHEDULE, t));
1049 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1053 belch("--<< TSO %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1054 t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1055 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1058 /* don't need to do anything. Either the thread is blocked on
1059 * I/O, in which case we'll have called addToBlockedQueue
1060 * previously, or it's blocked on an MVar or Blackhole, in which
1061 * case it'll be on the relevant queue already.
1064 fprintf(stderr, "--<< TSO %d (%p) stopped ", t->id, t);
1065 printThreadBlockage(t);
1066 fprintf(stderr, "\n"));
1068 /* Only for dumping event to log file
1069 ToDo: do I need this in GranSim, too?
1076 case ThreadFinished:
1077 /* Need to check whether this was a main thread, and if so, signal
1078 * the task that started it with the return value. If we have no
1079 * more main threads, we probably need to stop all the tasks until
1082 IF_DEBUG(scheduler,belch("--++ TSO %d (%p) finished", t->id, t));
1083 t->what_next = ThreadComplete;
1085 endThread(t, CurrentProc); // clean-up the thread
1087 advisory_thread_count--;
1088 if (RtsFlags.ParFlags.ParStats.Full)
1089 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1094 barf("doneThread: invalid thread return code");
1098 cap->link = free_capabilities;
1099 free_capabilities = cap;
1100 n_free_capabilities++;
1104 if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
1109 /* everybody back, start the GC.
1110 * Could do it in this thread, or signal a condition var
1111 * to do it in another thread. Either way, we need to
1112 * broadcast on gc_pending_cond afterward.
1115 IF_DEBUG(scheduler,sched_belch("doing GC"));
1117 GarbageCollect(GetRoots);
1118 ready_to_gc = rtsFalse;
1120 pthread_cond_broadcast(&gc_pending_cond);
1123 /* add a ContinueThread event to continue execution of current thread */
1124 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1126 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1128 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1135 IF_GRAN_DEBUG(unused,
1136 print_eventq(EventHd));
1138 event = get_next_event();
1142 /* ToDo: wait for next message to arrive rather than busy wait */
1147 t = take_off_run_queue(END_TSO_QUEUE);
1150 } /* end of while(1) */
1153 /* A hack for Hugs concurrency support. Needs sanitisation (?) */
1154 void deleteAllThreads ( void )
1157 IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
1158 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1161 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1164 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1165 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1168 /* startThread and insertThread are now in GranSim.c -- HWL */
1170 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1171 //@subsection Suspend and Resume
1173 /* ---------------------------------------------------------------------------
1174 * Suspending & resuming Haskell threads.
1176 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1177 * its capability before calling the C function. This allows another
1178 * task to pick up the capability and carry on running Haskell
1179 * threads. It also means that if the C call blocks, it won't lock
1182 * The Haskell thread making the C call is put to sleep for the
1183 * duration of the call, on the susepended_ccalling_threads queue. We
1184 * give out a token to the task, which it can use to resume the thread
1185 * on return from the C function.
1186 * ------------------------------------------------------------------------- */
1189 suspendThread( Capability *cap )
1193 ACQUIRE_LOCK(&sched_mutex);
1196 sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
1198 threadPaused(cap->rCurrentTSO);
1199 cap->rCurrentTSO->link = suspended_ccalling_threads;
1200 suspended_ccalling_threads = cap->rCurrentTSO;
1202 /* Use the thread ID as the token; it should be unique */
1203 tok = cap->rCurrentTSO->id;
1206 cap->link = free_capabilities;
1207 free_capabilities = cap;
1208 n_free_capabilities++;
1211 RELEASE_LOCK(&sched_mutex);
1216 resumeThread( StgInt tok )
1218 StgTSO *tso, **prev;
1221 ACQUIRE_LOCK(&sched_mutex);
1223 prev = &suspended_ccalling_threads;
1224 for (tso = suspended_ccalling_threads;
1225 tso != END_TSO_QUEUE;
1226 prev = &tso->link, tso = tso->link) {
1227 if (tso->id == (StgThreadID)tok) {
1232 if (tso == END_TSO_QUEUE) {
1233 barf("resumeThread: thread not found");
1237 while (free_capabilities == NULL) {
1238 IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1239 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1240 IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1242 cap = free_capabilities;
1243 free_capabilities = cap->link;
1244 n_free_capabilities--;
1246 cap = &MainRegTable;
1249 cap->rCurrentTSO = tso;
1251 RELEASE_LOCK(&sched_mutex);
1256 /* ---------------------------------------------------------------------------
1258 * ------------------------------------------------------------------------ */
1259 static void unblockThread(StgTSO *tso);
1261 /* ---------------------------------------------------------------------------
1262 * Comparing Thread ids.
1264 * This is used from STG land in the implementation of the
1265 * instances of Eq/Ord for ThreadIds.
1266 * ------------------------------------------------------------------------ */
1268 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1270 StgThreadID id1 = tso1->id;
1271 StgThreadID id2 = tso2->id;
1273 if (id1 < id2) return (-1);
1274 if (id1 > id2) return 1;
1278 /* ---------------------------------------------------------------------------
1279 Create a new thread.
1281 The new thread starts with the given stack size. Before the
1282 scheduler can run, however, this thread needs to have a closure
1283 (and possibly some arguments) pushed on its stack. See
1284 pushClosure() in Schedule.h.
1286 createGenThread() and createIOThread() (in SchedAPI.h) are
1287 convenient packaged versions of this function.
1289 currently pri (priority) is only used in a GRAN setup -- HWL
1290 ------------------------------------------------------------------------ */
1291 //@cindex createThread
1293 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1295 createThread(nat stack_size, StgInt pri)
1297 return createThread_(stack_size, rtsFalse, pri);
1301 createThread_(nat size, rtsBool have_lock, StgInt pri)
1305 createThread(nat stack_size)
1307 return createThread_(stack_size, rtsFalse);
1311 createThread_(nat size, rtsBool have_lock)
1318 /* First check whether we should create a thread at all */
1320 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1321 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1323 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1324 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1325 return END_TSO_QUEUE;
1331 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1334 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1336 /* catch ridiculously small stack sizes */
1337 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1338 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1341 stack_size = size - TSO_STRUCT_SIZEW;
1343 tso = (StgTSO *)allocate(size);
1344 TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1346 SET_HDR(tso, &TSO_info, CCS_MAIN);
1348 SET_GRAN_HDR(tso, ThisPE);
1350 tso->what_next = ThreadEnterGHC;
1352 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1353 * protect the increment operation on next_thread_id.
1354 * In future, we could use an atomic increment instead.
1356 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1357 tso->id = next_thread_id++;
1358 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1360 tso->why_blocked = NotBlocked;
1361 tso->blocked_exceptions = NULL;
1363 tso->splim = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1364 tso->stack_size = stack_size;
1365 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1367 tso->sp = (P_)&(tso->stack) + stack_size;
1370 tso->prof.CCCS = CCS_MAIN;
1373 /* put a stop frame on the stack */
1374 tso->sp -= sizeofW(StgStopFrame);
1375 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
1376 tso->su = (StgUpdateFrame*)tso->sp;
1378 IF_DEBUG(scheduler,belch("---- Initialised TSO %ld (%p), stack size = %lx words",
1379 tso->id, tso, tso->stack_size));
1383 tso->link = END_TSO_QUEUE;
1384 /* uses more flexible routine in GranSim */
1385 insertThread(tso, CurrentProc);
1387 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1392 #if defined(GRAN) || defined(PAR)
1393 DumpGranEvent(GR_START,tso);
1396 /* Link the new thread on the global thread list.
1398 tso->global_link = all_threads;
1402 tso->gran.pri = pri;
1404 tso->gran.magic = TSO_MAGIC; // debugging only
1406 tso->gran.sparkname = 0;
1407 tso->gran.startedat = CURRENT_TIME;
1408 tso->gran.exported = 0;
1409 tso->gran.basicblocks = 0;
1410 tso->gran.allocs = 0;
1411 tso->gran.exectime = 0;
1412 tso->gran.fetchtime = 0;
1413 tso->gran.fetchcount = 0;
1414 tso->gran.blocktime = 0;
1415 tso->gran.blockcount = 0;
1416 tso->gran.blockedat = 0;
1417 tso->gran.globalsparks = 0;
1418 tso->gran.localsparks = 0;
1419 if (RtsFlags.GranFlags.Light)
1420 tso->gran.clock = Now; /* local clock */
1422 tso->gran.clock = 0;
1424 IF_DEBUG(gran,printTSO(tso));
1427 tso->par.magic = TSO_MAGIC; // debugging only
1429 tso->par.sparkname = 0;
1430 tso->par.startedat = CURRENT_TIME;
1431 tso->par.exported = 0;
1432 tso->par.basicblocks = 0;
1433 tso->par.allocs = 0;
1434 tso->par.exectime = 0;
1435 tso->par.fetchtime = 0;
1436 tso->par.fetchcount = 0;
1437 tso->par.blocktime = 0;
1438 tso->par.blockcount = 0;
1439 tso->par.blockedat = 0;
1440 tso->par.globalsparks = 0;
1441 tso->par.localsparks = 0;
1445 globalGranStats.tot_threads_created++;
1446 globalGranStats.threads_created_on_PE[CurrentProc]++;
1447 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1448 globalGranStats.tot_sq_probes++;
1453 belch("==__ schedule: Created TSO %d (%p);",
1454 CurrentProc, tso, tso->id));
1456 IF_PAR_DEBUG(verbose,
1457 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1458 tso->id, tso, advisory_thread_count));
1460 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1461 tso->id, tso->stack_size));
1467 Turn a spark into a thread.
1468 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1471 //@cindex activateSpark
1473 activateSpark (rtsSpark spark)
1477 ASSERT(spark != (rtsSpark)NULL);
1478 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1479 if (tso!=END_TSO_QUEUE) {
1480 pushClosure(tso,spark);
1481 PUSH_ON_RUN_QUEUE(tso);
1482 advisory_thread_count++;
1484 if (RtsFlags.ParFlags.ParStats.Full) {
1485 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1486 IF_PAR_DEBUG(verbose,
1487 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1488 (StgClosure *)spark, info_type((StgClosure *)spark)));
1491 barf("activateSpark: Cannot create TSO");
1493 // ToDo: fwd info on local/global spark to thread -- HWL
1494 // tso->gran.exported = spark->exported;
1495 // tso->gran.locked = !spark->global;
1496 // tso->gran.sparkname = spark->name;
1502 /* ---------------------------------------------------------------------------
1505 * scheduleThread puts a thread on the head of the runnable queue.
1506 * This will usually be done immediately after a thread is created.
1507 * The caller of scheduleThread must create the thread using e.g.
1508 * createThread and push an appropriate closure
1509 * on this thread's stack before the scheduler is invoked.
1510 * ------------------------------------------------------------------------ */
1513 scheduleThread(StgTSO *tso)
1515 if (tso==END_TSO_QUEUE){
1520 ACQUIRE_LOCK(&sched_mutex);
1522 /* Put the new thread on the head of the runnable queue. The caller
1523 * better push an appropriate closure on this thread's stack
1524 * beforehand. In the SMP case, the thread may start running as
1525 * soon as we release the scheduler lock below.
1527 PUSH_ON_RUN_QUEUE(tso);
1530 IF_DEBUG(scheduler,printTSO(tso));
1531 RELEASE_LOCK(&sched_mutex);
1534 /* ---------------------------------------------------------------------------
1537 * Start up Posix threads to run each of the scheduler tasks.
1538 * I believe the task ids are not needed in the system as defined.
1540 * ------------------------------------------------------------------------ */
1542 #if defined(PAR) || defined(SMP)
1544 taskStart( void *arg STG_UNUSED )
1546 rts_evalNothing(NULL);
1550 /* ---------------------------------------------------------------------------
1553 * Initialise the scheduler. This resets all the queues - if the
1554 * queues contained any threads, they'll be garbage collected at the
1557 * This now calls startTasks(), so should only be called once! KH @ 25/10/99
1558 * ------------------------------------------------------------------------ */
1562 term_handler(int sig STG_UNUSED)
1565 ACQUIRE_LOCK(&term_mutex);
1567 RELEASE_LOCK(&term_mutex);
1572 //@cindex initScheduler
1579 for (i=0; i<=MAX_PROC; i++) {
1580 run_queue_hds[i] = END_TSO_QUEUE;
1581 run_queue_tls[i] = END_TSO_QUEUE;
1582 blocked_queue_hds[i] = END_TSO_QUEUE;
1583 blocked_queue_tls[i] = END_TSO_QUEUE;
1584 ccalling_threadss[i] = END_TSO_QUEUE;
1587 run_queue_hd = END_TSO_QUEUE;
1588 run_queue_tl = END_TSO_QUEUE;
1589 blocked_queue_hd = END_TSO_QUEUE;
1590 blocked_queue_tl = END_TSO_QUEUE;
1593 suspended_ccalling_threads = END_TSO_QUEUE;
1595 main_threads = NULL;
1596 all_threads = END_TSO_QUEUE;
1601 enteredCAFs = END_CAF_LIST;
1603 /* Install the SIGHUP handler */
1606 struct sigaction action,oact;
1608 action.sa_handler = term_handler;
1609 sigemptyset(&action.sa_mask);
1610 action.sa_flags = 0;
1611 if (sigaction(SIGTERM, &action, &oact) != 0) {
1612 barf("can't install TERM handler");
1618 /* Allocate N Capabilities */
1621 Capability *cap, *prev;
1624 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1625 cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1629 free_capabilities = cap;
1630 n_free_capabilities = RtsFlags.ParFlags.nNodes;
1632 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1633 n_free_capabilities););
1636 #if defined(SMP) || defined(PAR)
1649 /* make some space for saving all the thread ids */
1650 task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1651 "initScheduler:task_ids");
1653 /* and create all the threads */
1654 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1655 r = pthread_create(&tid,NULL,taskStart,NULL);
1657 barf("startTasks: Can't create new Posix thread");
1659 task_ids[i].id = tid;
1660 task_ids[i].mut_time = 0.0;
1661 task_ids[i].mut_etime = 0.0;
1662 task_ids[i].gc_time = 0.0;
1663 task_ids[i].gc_etime = 0.0;
1664 task_ids[i].elapsedtimestart = elapsedtime();
1665 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1671 exitScheduler( void )
1676 /* Don't want to use pthread_cancel, since we'd have to install
1677 * these silly exception handlers (pthread_cleanup_{push,pop}) around
1681 /* Cancel all our tasks */
1682 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1683 pthread_cancel(task_ids[i].id);
1686 /* Wait for all the tasks to terminate */
1687 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1688 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n",
1690 pthread_join(task_ids[i].id, NULL);
1694 /* Send 'em all a SIGHUP. That should shut 'em up.
1696 await_death = RtsFlags.ParFlags.nNodes;
1697 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1698 pthread_kill(task_ids[i].id,SIGTERM);
1700 while (await_death > 0) {
1706 /* -----------------------------------------------------------------------------
1707 Managing the per-task allocation areas.
1709 Each capability comes with an allocation area. These are
1710 fixed-length block lists into which allocation can be done.
1712 ToDo: no support for two-space collection at the moment???
1713 -------------------------------------------------------------------------- */
1715 /* -----------------------------------------------------------------------------
1716 * waitThread is the external interface for running a new computation
1717 * and waiting for the result.
1719 * In the non-SMP case, we create a new main thread, push it on the
1720 * main-thread stack, and invoke the scheduler to run it. The
1721 * scheduler will return when the top main thread on the stack has
1722 * completed or died, and fill in the necessary fields of the
1723 * main_thread structure.
1725 * In the SMP case, we create a main thread as before, but we then
1726 * create a new condition variable and sleep on it. When our new
1727 * main thread has completed, we'll be woken up and the status/result
1728 * will be in the main_thread struct.
1729 * -------------------------------------------------------------------------- */
1732 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1735 SchedulerStatus stat;
1737 ACQUIRE_LOCK(&sched_mutex);
1739 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1745 pthread_cond_init(&m->wakeup, NULL);
1748 m->link = main_threads;
1751 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n",
1756 pthread_cond_wait(&m->wakeup, &sched_mutex);
1757 } while (m->stat == NoStatus);
1759 /* GranSim specific init */
1760 CurrentTSO = m->tso; // the TSO to run
1761 procStatus[MainProc] = Busy; // status of main PE
1762 CurrentProc = MainProc; // PE to run it on
1767 ASSERT(m->stat != NoStatus);
1773 pthread_cond_destroy(&m->wakeup);
1776 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n",
1780 RELEASE_LOCK(&sched_mutex);
1785 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1786 //@subsection Run queue code
1790 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1791 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1792 implicit global variable that has to be correct when calling these
1796 /* Put the new thread on the head of the runnable queue.
1797 * The caller of createThread better push an appropriate closure
1798 * on this thread's stack before the scheduler is invoked.
1800 static /* inline */ void
1801 add_to_run_queue(tso)
1804 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1805 tso->link = run_queue_hd;
1807 if (run_queue_tl == END_TSO_QUEUE) {
1812 /* Put the new thread at the end of the runnable queue. */
1813 static /* inline */ void
1814 push_on_run_queue(tso)
1817 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1818 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1819 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1820 if (run_queue_hd == END_TSO_QUEUE) {
1823 run_queue_tl->link = tso;
1829 Should be inlined because it's used very often in schedule. The tso
1830 argument is actually only needed in GranSim, where we want to have the
1831 possibility to schedule *any* TSO on the run queue, irrespective of the
1832 actual ordering. Therefore, if tso is not the nil TSO then we traverse
1833 the run queue and dequeue the tso, adjusting the links in the queue.
1835 //@cindex take_off_run_queue
1836 static /* inline */ StgTSO*
1837 take_off_run_queue(StgTSO *tso) {
1841 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1843 if tso is specified, unlink that tso from the run_queue (doesn't have
1844 to be at the beginning of the queue); GranSim only
1846 if (tso!=END_TSO_QUEUE) {
1847 /* find tso in queue */
1848 for (t=run_queue_hd, prev=END_TSO_QUEUE;
1849 t!=END_TSO_QUEUE && t!=tso;
1853 /* now actually dequeue the tso */
1854 if (prev!=END_TSO_QUEUE) {
1855 ASSERT(run_queue_hd!=t);
1856 prev->link = t->link;
1858 /* t is at beginning of thread queue */
1859 ASSERT(run_queue_hd==t);
1860 run_queue_hd = t->link;
1862 /* t is at end of thread queue */
1863 if (t->link==END_TSO_QUEUE) {
1864 ASSERT(t==run_queue_tl);
1865 run_queue_tl = prev;
1867 ASSERT(run_queue_tl!=t);
1869 t->link = END_TSO_QUEUE;
1871 /* take tso from the beginning of the queue; std concurrent code */
1873 if (t != END_TSO_QUEUE) {
1874 run_queue_hd = t->link;
1875 t->link = END_TSO_QUEUE;
1876 if (run_queue_hd == END_TSO_QUEUE) {
1877 run_queue_tl = END_TSO_QUEUE;
1886 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1887 //@subsection Garbage Collextion Routines
1889 /* ---------------------------------------------------------------------------
1890 Where are the roots that we know about?
1892 - all the threads on the runnable queue
1893 - all the threads on the blocked queue
1894 - all the thread currently executing a _ccall_GC
1895 - all the "main threads"
1897 ------------------------------------------------------------------------ */
1899 /* This has to be protected either by the scheduler monitor, or by the
1900 garbage collection monitor (probably the latter).
1904 static void GetRoots(void)
1911 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1912 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1913 run_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1914 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1915 run_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1917 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1918 blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1919 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1920 blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1921 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1922 ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1929 if (run_queue_hd != END_TSO_QUEUE) {
1930 ASSERT(run_queue_tl != END_TSO_QUEUE);
1931 run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1932 run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1935 if (blocked_queue_hd != END_TSO_QUEUE) {
1936 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1937 blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1938 blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1942 for (m = main_threads; m != NULL; m = m->link) {
1943 m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1945 if (suspended_ccalling_threads != END_TSO_QUEUE)
1946 suspended_ccalling_threads =
1947 (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1949 #if defined(SMP) || defined(PAR) || defined(GRAN)
1954 /* -----------------------------------------------------------------------------
1957 This is the interface to the garbage collector from Haskell land.
1958 We provide this so that external C code can allocate and garbage
1959 collect when called from Haskell via _ccall_GC.
1961 It might be useful to provide an interface whereby the programmer
1962 can specify more roots (ToDo).
1964 This needs to be protected by the GC condition variable above. KH.
1965 -------------------------------------------------------------------------- */
1967 void (*extra_roots)(void);
1972 GarbageCollect(GetRoots);
1978 GetRoots(); /* the scheduler's roots */
1979 extra_roots(); /* the user's roots */
1983 performGCWithRoots(void (*get_roots)(void))
1985 extra_roots = get_roots;
1987 GarbageCollect(AllRoots);
1990 /* -----------------------------------------------------------------------------
1993 If the thread has reached its maximum stack size, then raise the
1994 StackOverflow exception in the offending thread. Otherwise
1995 relocate the TSO into a larger chunk of memory and adjust its stack
1997 -------------------------------------------------------------------------- */
2000 threadStackOverflow(StgTSO *tso)
2002 nat new_stack_size, new_tso_size, diff, stack_words;
2006 IF_DEBUG(sanity,checkTSO(tso));
2007 if (tso->stack_size >= tso->max_stack_size) {
2010 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2011 tso->id, tso, tso->stack_size, tso->max_stack_size);
2012 /* If we're debugging, just print out the top of the stack */
2013 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2017 fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
2020 /* Send this thread the StackOverflow exception */
2021 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2026 /* Try to double the current stack size. If that takes us over the
2027 * maximum stack size for this thread, then use the maximum instead.
2028 * Finally round up so the TSO ends up as a whole number of blocks.
2030 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2031 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2032 TSO_STRUCT_SIZE)/sizeof(W_);
2033 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2034 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2036 IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2038 dest = (StgTSO *)allocate(new_tso_size);
2039 TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2041 /* copy the TSO block and the old stack into the new area */
2042 memcpy(dest,tso,TSO_STRUCT_SIZE);
2043 stack_words = tso->stack + tso->stack_size - tso->sp;
2044 new_sp = (P_)dest + new_tso_size - stack_words;
2045 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2047 /* relocate the stack pointers... */
2048 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2049 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2051 dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
2052 dest->stack_size = new_stack_size;
2054 /* and relocate the update frame list */
2055 relocate_TSO(tso, dest);
2057 /* Mark the old TSO as relocated. We have to check for relocated
2058 * TSOs in the garbage collector and any primops that deal with TSOs.
2060 * It's important to set the sp and su values to just beyond the end
2061 * of the stack, so we don't attempt to scavenge any part of the
2064 tso->what_next = ThreadRelocated;
2066 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2067 tso->su = (StgUpdateFrame *)tso->sp;
2068 tso->why_blocked = NotBlocked;
2069 dest->mut_link = NULL;
2071 IF_PAR_DEBUG(verbose,
2072 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2073 tso->id, tso, tso->stack_size);
2074 /* If we're debugging, just print out the top of the stack */
2075 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2078 IF_DEBUG(sanity,checkTSO(tso));
2080 IF_DEBUG(scheduler,printTSO(dest));
2086 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2087 //@subsection Blocking Queue Routines
2089 /* ---------------------------------------------------------------------------
2090 Wake up a queue that was blocked on some resource.
2091 ------------------------------------------------------------------------ */
2093 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
2097 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2102 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2104 /* write RESUME events to log file and
2105 update blocked and fetch time (depending on type of the orig closure) */
2106 if (RtsFlags.ParFlags.ParStats.Full) {
2107 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2108 GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2109 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2111 switch (get_itbl(node)->type) {
2113 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2118 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2121 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2128 static StgBlockingQueueElement *
2129 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2132 PEs node_loc, tso_loc;
2134 node_loc = where_is(node); // should be lifted out of loop
2135 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2136 tso_loc = where_is((StgClosure *)tso);
2137 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2138 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2139 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2140 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2141 // insertThread(tso, node_loc);
2142 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2144 tso, node, (rtsSpark*)NULL);
2145 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2148 } else { // TSO is remote (actually should be FMBQ)
2149 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2150 RtsFlags.GranFlags.Costs.gunblocktime +
2151 RtsFlags.GranFlags.Costs.latency;
2152 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2154 tso, node, (rtsSpark*)NULL);
2155 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2158 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2160 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2161 (node_loc==tso_loc ? "Local" : "Global"),
2162 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2163 tso->block_info.closure = NULL;
2164 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2168 static StgBlockingQueueElement *
2169 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2171 StgBlockingQueueElement *next;
2173 switch (get_itbl(bqe)->type) {
2175 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2176 /* if it's a TSO just push it onto the run_queue */
2178 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2179 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2181 unblockCount(bqe, node);
2182 /* reset blocking status after dumping event */
2183 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2187 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2189 bqe->link = PendingFetches;
2190 PendingFetches = bqe;
2194 /* can ignore this case in a non-debugging setup;
2195 see comments on RBHSave closures above */
2197 /* check that the closure is an RBHSave closure */
2198 ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
2199 get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
2200 get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
2204 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2205 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2209 // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2213 #else /* !GRAN && !PAR */
2215 unblockOneLocked(StgTSO *tso)
2219 ASSERT(get_itbl(tso)->type == TSO);
2220 ASSERT(tso->why_blocked != NotBlocked);
2221 tso->why_blocked = NotBlocked;
2223 PUSH_ON_RUN_QUEUE(tso);
2225 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2230 #if defined(GRAN) || defined(PAR)
2231 inline StgBlockingQueueElement *
2232 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2234 ACQUIRE_LOCK(&sched_mutex);
2235 bqe = unblockOneLocked(bqe, node);
2236 RELEASE_LOCK(&sched_mutex);
2241 unblockOne(StgTSO *tso)
2243 ACQUIRE_LOCK(&sched_mutex);
2244 tso = unblockOneLocked(tso);
2245 RELEASE_LOCK(&sched_mutex);
2252 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2254 StgBlockingQueueElement *bqe;
2259 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2260 node, CurrentProc, CurrentTime[CurrentProc],
2261 CurrentTSO->id, CurrentTSO));
2263 node_loc = where_is(node);
2265 ASSERT(get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2266 get_itbl(q)->type == CONSTR); // closure (type constructor)
2267 ASSERT(is_unique(node));
2269 /* FAKE FETCH: magically copy the node to the tso's proc;
2270 no Fetch necessary because in reality the node should not have been
2271 moved to the other PE in the first place
2273 if (CurrentProc!=node_loc) {
2275 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2276 node, node_loc, CurrentProc, CurrentTSO->id,
2277 // CurrentTSO, where_is(CurrentTSO),
2278 node->header.gran.procs));
2279 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2281 belch("## new bitmask of node %p is %#x",
2282 node, node->header.gran.procs));
2283 if (RtsFlags.GranFlags.GranSimStats.Global) {
2284 globalGranStats.tot_fake_fetches++;
2289 // ToDo: check: ASSERT(CurrentProc==node_loc);
2290 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2293 bqe points to the current element in the queue
2294 next points to the next element in the queue
2296 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2297 //tso_loc = where_is(tso);
2299 bqe = unblockOneLocked(bqe, node);
2302 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2303 the closure to make room for the anchor of the BQ */
2304 if (bqe!=END_BQ_QUEUE) {
2305 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2307 ASSERT((info_ptr==&RBH_Save_0_info) ||
2308 (info_ptr==&RBH_Save_1_info) ||
2309 (info_ptr==&RBH_Save_2_info));
2311 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2312 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2313 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2316 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2317 node, info_type(node)));
2320 /* statistics gathering */
2321 if (RtsFlags.GranFlags.GranSimStats.Global) {
2322 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2323 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2324 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2325 globalGranStats.tot_awbq++; // total no. of bqs awakened
2328 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2329 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2333 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2335 StgBlockingQueueElement *bqe, *next;
2337 ACQUIRE_LOCK(&sched_mutex);
2339 IF_PAR_DEBUG(verbose,
2340 belch("## AwBQ for node %p on [%x]: ",
2343 ASSERT(get_itbl(q)->type == TSO ||
2344 get_itbl(q)->type == BLOCKED_FETCH ||
2345 get_itbl(q)->type == CONSTR);
2348 while (get_itbl(bqe)->type==TSO ||
2349 get_itbl(bqe)->type==BLOCKED_FETCH) {
2350 bqe = unblockOneLocked(bqe, node);
2352 RELEASE_LOCK(&sched_mutex);
2355 #else /* !GRAN && !PAR */
2357 awakenBlockedQueue(StgTSO *tso)
2359 ACQUIRE_LOCK(&sched_mutex);
2360 while (tso != END_TSO_QUEUE) {
2361 tso = unblockOneLocked(tso);
2363 RELEASE_LOCK(&sched_mutex);
2367 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2368 //@subsection Exception Handling Routines
2370 /* ---------------------------------------------------------------------------
2372 - usually called inside a signal handler so it mustn't do anything fancy.
2373 ------------------------------------------------------------------------ */
2376 interruptStgRts(void)
2382 /* -----------------------------------------------------------------------------
2385 This is for use when we raise an exception in another thread, which
2387 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2388 -------------------------------------------------------------------------- */
2390 #if defined(GRAN) || defined(PAR)
2392 NB: only the type of the blocking queue is different in GranSim and GUM
2393 the operations on the queue-elements are the same
2394 long live polymorphism!
2397 unblockThread(StgTSO *tso)
2399 StgBlockingQueueElement *t, **last;
2401 ACQUIRE_LOCK(&sched_mutex);
2402 switch (tso->why_blocked) {
2405 return; /* not blocked */
2408 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2410 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2411 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2413 last = (StgBlockingQueueElement **)&mvar->head;
2414 for (t = (StgBlockingQueueElement *)mvar->head;
2416 last = &t->link, last_tso = t, t = t->link) {
2417 if (t == (StgBlockingQueueElement *)tso) {
2418 *last = (StgBlockingQueueElement *)tso->link;
2419 if (mvar->tail == tso) {
2420 mvar->tail = (StgTSO *)last_tso;
2425 barf("unblockThread (MVAR): TSO not found");
2428 case BlockedOnBlackHole:
2429 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2431 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2433 last = &bq->blocking_queue;
2434 for (t = bq->blocking_queue;
2436 last = &t->link, t = t->link) {
2437 if (t == (StgBlockingQueueElement *)tso) {
2438 *last = (StgBlockingQueueElement *)tso->link;
2442 barf("unblockThread (BLACKHOLE): TSO not found");
2445 case BlockedOnException:
2447 StgTSO *target = tso->block_info.tso;
2449 ASSERT(get_itbl(target)->type == TSO);
2450 ASSERT(target->blocked_exceptions != NULL);
2452 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2453 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2455 last = &t->link, t = t->link) {
2456 ASSERT(get_itbl(t)->type == TSO);
2457 if (t == (StgBlockingQueueElement *)tso) {
2458 *last = (StgBlockingQueueElement *)tso->link;
2462 barf("unblockThread (Exception): TSO not found");
2465 case BlockedOnDelay:
2467 case BlockedOnWrite:
2469 StgBlockingQueueElement *prev = NULL;
2470 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2471 prev = t, t = t->link) {
2472 if (t == (StgBlockingQueueElement *)tso) {
2474 blocked_queue_hd = (StgTSO *)t->link;
2475 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2476 blocked_queue_tl = END_TSO_QUEUE;
2479 prev->link = t->link;
2480 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2481 blocked_queue_tl = (StgTSO *)prev;
2487 barf("unblockThread (I/O): TSO not found");
2491 barf("unblockThread");
2495 tso->link = END_TSO_QUEUE;
2496 tso->why_blocked = NotBlocked;
2497 tso->block_info.closure = NULL;
2498 PUSH_ON_RUN_QUEUE(tso);
2499 RELEASE_LOCK(&sched_mutex);
2503 unblockThread(StgTSO *tso)
2507 ACQUIRE_LOCK(&sched_mutex);
2508 switch (tso->why_blocked) {
2511 return; /* not blocked */
2514 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2516 StgTSO *last_tso = END_TSO_QUEUE;
2517 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2520 for (t = mvar->head; t != END_TSO_QUEUE;
2521 last = &t->link, last_tso = t, t = t->link) {
2524 if (mvar->tail == tso) {
2525 mvar->tail = last_tso;
2530 barf("unblockThread (MVAR): TSO not found");
2533 case BlockedOnBlackHole:
2534 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2536 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2538 last = &bq->blocking_queue;
2539 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2540 last = &t->link, t = t->link) {
2546 barf("unblockThread (BLACKHOLE): TSO not found");
2549 case BlockedOnException:
2551 StgTSO *target = tso->block_info.tso;
2553 ASSERT(get_itbl(target)->type == TSO);
2554 ASSERT(target->blocked_exceptions != NULL);
2556 last = &target->blocked_exceptions;
2557 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2558 last = &t->link, t = t->link) {
2559 ASSERT(get_itbl(t)->type == TSO);
2565 barf("unblockThread (Exception): TSO not found");
2568 case BlockedOnDelay:
2570 case BlockedOnWrite:
2572 StgTSO *prev = NULL;
2573 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2574 prev = t, t = t->link) {
2577 blocked_queue_hd = t->link;
2578 if (blocked_queue_tl == t) {
2579 blocked_queue_tl = END_TSO_QUEUE;
2582 prev->link = t->link;
2583 if (blocked_queue_tl == t) {
2584 blocked_queue_tl = prev;
2590 barf("unblockThread (I/O): TSO not found");
2594 barf("unblockThread");
2598 tso->link = END_TSO_QUEUE;
2599 tso->why_blocked = NotBlocked;
2600 tso->block_info.closure = NULL;
2601 PUSH_ON_RUN_QUEUE(tso);
2602 RELEASE_LOCK(&sched_mutex);
2606 /* -----------------------------------------------------------------------------
2609 * The following function implements the magic for raising an
2610 * asynchronous exception in an existing thread.
2612 * We first remove the thread from any queue on which it might be
2613 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2615 * We strip the stack down to the innermost CATCH_FRAME, building
2616 * thunks in the heap for all the active computations, so they can
2617 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2618 * an application of the handler to the exception, and push it on
2619 * the top of the stack.
2621 * How exactly do we save all the active computations? We create an
2622 * AP_UPD for every UpdateFrame on the stack. Entering one of these
2623 * AP_UPDs pushes everything from the corresponding update frame
2624 * upwards onto the stack. (Actually, it pushes everything up to the
2625 * next update frame plus a pointer to the next AP_UPD object.
2626 * Entering the next AP_UPD object pushes more onto the stack until we
2627 * reach the last AP_UPD object - at which point the stack should look
2628 * exactly as it did when we killed the TSO and we can continue
2629 * execution by entering the closure on top of the stack.
2631 * We can also kill a thread entirely - this happens if either (a) the
2632 * exception passed to raiseAsync is NULL, or (b) there's no
2633 * CATCH_FRAME on the stack. In either case, we strip the entire
2634 * stack and replace the thread with a zombie.
2636 * -------------------------------------------------------------------------- */
2639 deleteThread(StgTSO *tso)
2641 raiseAsync(tso,NULL);
2645 raiseAsync(StgTSO *tso, StgClosure *exception)
2647 StgUpdateFrame* su = tso->su;
2648 StgPtr sp = tso->sp;
2650 /* Thread already dead? */
2651 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2655 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2657 /* Remove it from any blocking queues */
2660 /* The stack freezing code assumes there's a closure pointer on
2661 * the top of the stack. This isn't always the case with compiled
2662 * code, so we have to push a dummy closure on the top which just
2663 * returns to the next return address on the stack.
2665 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2666 *(--sp) = (W_)&dummy_ret_closure;
2670 int words = ((P_)su - (P_)sp) - 1;
2674 /* If we find a CATCH_FRAME, and we've got an exception to raise,
2675 * then build PAP(handler,exception,realworld#), and leave it on
2676 * top of the stack ready to enter.
2678 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2679 StgCatchFrame *cf = (StgCatchFrame *)su;
2680 /* we've got an exception to raise, so let's pass it to the
2681 * handler in this frame.
2683 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2684 TICK_ALLOC_UPD_PAP(3,0);
2685 SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2688 ap->fun = cf->handler; /* :: Exception -> IO a */
2689 ap->payload[0] = (P_)exception;
2690 ap->payload[1] = ARG_TAG(0); /* realworld token */
2692 /* throw away the stack from Sp up to and including the
2695 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
2698 /* Restore the blocked/unblocked state for asynchronous exceptions
2699 * at the CATCH_FRAME.
2701 * If exceptions were unblocked at the catch, arrange that they
2702 * are unblocked again after executing the handler by pushing an
2703 * unblockAsyncExceptions_ret stack frame.
2705 if (!cf->exceptions_blocked) {
2706 *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2709 /* Ensure that async exceptions are blocked when running the handler.
2711 if (tso->blocked_exceptions == NULL) {
2712 tso->blocked_exceptions = END_TSO_QUEUE;
2715 /* Put the newly-built PAP on top of the stack, ready to execute
2716 * when the thread restarts.
2720 tso->what_next = ThreadEnterGHC;
2724 /* First build an AP_UPD consisting of the stack chunk above the
2725 * current update frame, with the top word on the stack as the
2728 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2733 ap->fun = (StgClosure *)sp[0];
2735 for(i=0; i < (nat)words; ++i) {
2736 ap->payload[i] = (P_)*sp++;
2739 switch (get_itbl(su)->type) {
2743 SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */);
2744 TICK_ALLOC_UP_THK(words+1,0);
2747 fprintf(stderr, "scheduler: Updating ");
2748 printPtr((P_)su->updatee);
2749 fprintf(stderr, " with ");
2750 printObj((StgClosure *)ap);
2753 /* Replace the updatee with an indirection - happily
2754 * this will also wake up any threads currently
2755 * waiting on the result.
2757 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
2759 sp += sizeofW(StgUpdateFrame) -1;
2760 sp[0] = (W_)ap; /* push onto stack */
2766 StgCatchFrame *cf = (StgCatchFrame *)su;
2769 /* We want a PAP, not an AP_UPD. Fortunately, the
2770 * layout's the same.
2772 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2773 TICK_ALLOC_UPD_PAP(words+1,0);
2775 /* now build o = FUN(catch,ap,handler) */
2776 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2777 TICK_ALLOC_FUN(2,0);
2778 SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2779 o->payload[0] = (StgClosure *)ap;
2780 o->payload[1] = cf->handler;
2783 fprintf(stderr, "scheduler: Built ");
2784 printObj((StgClosure *)o);
2787 /* pop the old handler and put o on the stack */
2789 sp += sizeofW(StgCatchFrame) - 1;
2796 StgSeqFrame *sf = (StgSeqFrame *)su;
2799 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2800 TICK_ALLOC_UPD_PAP(words+1,0);
2802 /* now build o = FUN(seq,ap) */
2803 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2804 TICK_ALLOC_SE_THK(1,0);
2805 SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2806 o->payload[0] = (StgClosure *)ap;
2809 fprintf(stderr, "scheduler: Built ");
2810 printObj((StgClosure *)o);
2813 /* pop the old handler and put o on the stack */
2815 sp += sizeofW(StgSeqFrame) - 1;
2821 /* We've stripped the entire stack, the thread is now dead. */
2822 sp += sizeofW(StgStopFrame) - 1;
2823 sp[0] = (W_)exception; /* save the exception */
2824 tso->what_next = ThreadKilled;
2825 tso->su = (StgUpdateFrame *)(sp+1);
2836 /* -----------------------------------------------------------------------------
2837 resurrectThreads is called after garbage collection on the list of
2838 threads found to be garbage. Each of these threads will be woken
2839 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2840 on an MVar, or NonTermination if the thread was blocked on a Black
2842 -------------------------------------------------------------------------- */
2845 resurrectThreads( StgTSO *threads )
2849 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2850 next = tso->global_link;
2851 tso->global_link = all_threads;
2853 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2855 switch (tso->why_blocked) {
2857 case BlockedOnException:
2858 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2860 case BlockedOnBlackHole:
2861 raiseAsync(tso,(StgClosure *)NonTermination_closure);
2864 /* This might happen if the thread was blocked on a black hole
2865 * belonging to a thread that we've just woken up (raiseAsync
2866 * can wake up threads, remember...).
2870 barf("resurrectThreads: thread blocked in a strange way");
2875 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2876 //@subsection Debugging Routines
2878 /* -----------------------------------------------------------------------------
2879 Debugging: why is a thread blocked
2880 -------------------------------------------------------------------------- */
2885 printThreadBlockage(StgTSO *tso)
2887 switch (tso->why_blocked) {
2889 fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2891 case BlockedOnWrite:
2892 fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2894 case BlockedOnDelay:
2895 #if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS)
2896 fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2898 fprintf(stderr,"blocked on delay of %d ms",
2899 tso->block_info.target - getourtimeofday());
2903 fprintf(stderr,"blocked on an MVar");
2905 case BlockedOnException:
2906 fprintf(stderr,"blocked on delivering an exception to thread %d",
2907 tso->block_info.tso->id);
2909 case BlockedOnBlackHole:
2910 fprintf(stderr,"blocked on a black hole");
2913 fprintf(stderr,"not blocked");
2917 fprintf(stderr,"blocked on global address; local FM_BQ is %p (%s)",
2918 tso->block_info.closure, info_type(tso->block_info.closure));
2920 case BlockedOnGA_NoSend:
2921 fprintf(stderr,"blocked on global address (no send); local FM_BQ is %p (%s)",
2922 tso->block_info.closure, info_type(tso->block_info.closure));
2926 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
2927 tso->why_blocked, tso->id, tso);
2932 printThreadStatus(StgTSO *tso)
2934 switch (tso->what_next) {
2936 fprintf(stderr,"has been killed");
2938 case ThreadComplete:
2939 fprintf(stderr,"has completed");
2942 printThreadBlockage(tso);
2947 printAllThreads(void)
2951 sched_belch("all threads:");
2952 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2953 fprintf(stderr, "\tthread %d is ", t->id);
2954 printThreadStatus(t);
2955 fprintf(stderr,"\n");
2960 Print a whole blocking queue attached to node (debugging only).
2965 print_bq (StgClosure *node)
2967 StgBlockingQueueElement *bqe;
2971 fprintf(stderr,"## BQ of closure %p (%s): ",
2972 node, info_type(node));
2974 /* should cover all closures that may have a blocking queue */
2975 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2976 get_itbl(node)->type == FETCH_ME_BQ ||
2977 get_itbl(node)->type == RBH);
2979 ASSERT(node!=(StgClosure*)NULL); // sanity check
2981 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2983 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2984 !end; // iterate until bqe points to a CONSTR
2985 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2986 ASSERT(bqe != END_BQ_QUEUE); // sanity check
2987 ASSERT(bqe != (StgTSO*)NULL); // sanity check
2988 /* types of closures that may appear in a blocking queue */
2989 ASSERT(get_itbl(bqe)->type == TSO ||
2990 get_itbl(bqe)->type == BLOCKED_FETCH ||
2991 get_itbl(bqe)->type == CONSTR);
2992 /* only BQs of an RBH end with an RBH_Save closure */
2993 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2995 switch (get_itbl(bqe)->type) {
2997 fprintf(stderr," TSO %d (%x),",
2998 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3001 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3002 ((StgBlockedFetch *)bqe)->node,
3003 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3004 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3005 ((StgBlockedFetch *)bqe)->ga.weight);
3008 fprintf(stderr," %s (IP %p),",
3009 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3010 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3011 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3012 "RBH_Save_?"), get_itbl(bqe));
3015 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3016 info_type(bqe), node, info_type(node));
3020 fputc('\n', stderr);
3022 # elif defined(GRAN)
3024 print_bq (StgClosure *node)
3026 StgBlockingQueueElement *bqe;
3027 PEs node_loc, tso_loc;
3030 /* should cover all closures that may have a blocking queue */
3031 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3032 get_itbl(node)->type == FETCH_ME_BQ ||
3033 get_itbl(node)->type == RBH);
3035 ASSERT(node!=(StgClosure*)NULL); // sanity check
3036 node_loc = where_is(node);
3038 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3039 node, info_type(node), node_loc);
3042 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3044 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3045 !end; // iterate until bqe points to a CONSTR
3046 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3047 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3048 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3049 /* types of closures that may appear in a blocking queue */
3050 ASSERT(get_itbl(bqe)->type == TSO ||
3051 get_itbl(bqe)->type == CONSTR);
3052 /* only BQs of an RBH end with an RBH_Save closure */
3053 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3055 tso_loc = where_is((StgClosure *)bqe);
3056 switch (get_itbl(bqe)->type) {
3058 fprintf(stderr," TSO %d (%p) on [PE %d],",
3059 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3062 fprintf(stderr," %s (IP %p),",
3063 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3064 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3065 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3066 "RBH_Save_?"), get_itbl(bqe));
3069 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3070 info_type((StgClosure *)bqe), node, info_type(node));
3074 fputc('\n', stderr);
3078 Nice and easy: only TSOs on the blocking queue
3081 print_bq (StgClosure *node)
3085 ASSERT(node!=(StgClosure*)NULL); // sanity check
3086 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3087 tso != END_TSO_QUEUE;
3089 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3090 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3091 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3093 fputc('\n', stderr);
3104 for (i=0, tso=run_queue_hd;
3105 tso != END_TSO_QUEUE;
3114 sched_belch(char *s, ...)
3119 fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3121 fprintf(stderr, "scheduler: ");
3123 vfprintf(stderr, s, ap);
3124 fprintf(stderr, "\n");
3130 //@node Index, , Debugging Routines, Main scheduling code
3134 //* MainRegTable:: @cindex\s-+MainRegTable
3135 //* StgMainThread:: @cindex\s-+StgMainThread
3136 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3137 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3138 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3139 //* context_switch:: @cindex\s-+context_switch
3140 //* createThread:: @cindex\s-+createThread
3141 //* free_capabilities:: @cindex\s-+free_capabilities
3142 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3143 //* initScheduler:: @cindex\s-+initScheduler
3144 //* interrupted:: @cindex\s-+interrupted
3145 //* n_free_capabilities:: @cindex\s-+n_free_capabilities
3146 //* next_thread_id:: @cindex\s-+next_thread_id
3147 //* print_bq:: @cindex\s-+print_bq
3148 //* run_queue_hd:: @cindex\s-+run_queue_hd
3149 //* run_queue_tl:: @cindex\s-+run_queue_tl
3150 //* sched_mutex:: @cindex\s-+sched_mutex
3151 //* schedule:: @cindex\s-+schedule
3152 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3153 //* task_ids:: @cindex\s-+task_ids
3154 //* term_mutex:: @cindex\s-+term_mutex
3155 //* thread_ready_cond:: @cindex\s-+thread_ready_cond