1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.60 2000/03/31 03:09:36 hwloidl Exp $
4 * (c) The GHC Team, 1998-2000
8 * The main scheduling code in GranSim is quite different from that in std
9 * (concurrent) Haskell: while concurrent Haskell just iterates over the
10 * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11 * over the events in the global event queue. -- HWL
12 * --------------------------------------------------------------------------*/
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
17 /* Version with scheduler monitor support for SMPs.
19 This design provides a high-level API to create and schedule threads etc.
20 as documented in the SMP design document.
22 It uses a monitor design controlled by a single mutex to exercise control
23 over accesses to shared data structures, and builds on the Posix threads
26 The majority of state is shared. In order to keep essential per-task state,
27 there is a Capability structure, which contains all the information
28 needed to run a thread: its STG registers, a pointer to its TSO, a
29 nursery etc. During STG execution, a pointer to the capability is
30 kept in a register (BaseReg).
32 In a non-SMP build, there is one global capability, namely MainRegTable.
39 //* Variables and Data structures::
40 //* Main scheduling loop::
41 //* Suspend and Resume::
43 //* Garbage Collextion Routines::
44 //* Blocking Queue Routines::
45 //* Exception Handling Routines::
46 //* Debugging Routines::
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
59 #include "StgStartup.h"
63 #include "StgMiscClosures.h"
65 #include "Evaluator.h"
66 #include "Exception.h"
70 #include "Profiling.h"
75 #if defined(GRAN) || defined(PAR)
76 # include "GranSimRts.h"
78 # include "ParallelRts.h"
79 # include "Parallel.h"
80 # include "ParallelDebug.h"
88 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
89 //@subsection Variables and Data structures
93 * These are the threads which clients have requested that we run.
95 * In an SMP build, we might have several concurrent clients all
96 * waiting for results, and each one will wait on a condition variable
97 * until the result is available.
99 * In non-SMP, clients are strictly nested: the first client calls
100 * into the RTS, which might call out again to C with a _ccall_GC, and
101 * eventually re-enter the RTS.
103 * Main threads information is kept in a linked list:
105 //@cindex StgMainThread
106 typedef struct StgMainThread_ {
108 SchedulerStatus stat;
111 pthread_cond_t wakeup;
113 struct StgMainThread_ *link;
116 /* Main thread queue.
117 * Locks required: sched_mutex.
119 static StgMainThread *main_threads;
122 * Locks required: sched_mutex.
126 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
127 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
130 In GranSim we have a runable and a blocked queue for each processor.
131 In order to minimise code changes new arrays run_queue_hds/tls
132 are created. run_queue_hd is then a short cut (macro) for
133 run_queue_hds[CurrentProc] (see GranSim.h).
136 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
137 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
138 StgTSO *ccalling_threadss[MAX_PROC];
139 /* We use the same global list of threads (all_threads) in GranSim as in
140 the std RTS (i.e. we are cheating). However, we don't use this list in
141 the GranSim specific code at the moment (so we are only potentially
146 StgTSO *run_queue_hd, *run_queue_tl;
147 StgTSO *blocked_queue_hd, *blocked_queue_tl;
151 /* Linked list of all threads.
152 * Used for detecting garbage collected threads.
156 /* Threads suspended in _ccall_GC.
158 static StgTSO *suspended_ccalling_threads;
160 static void GetRoots(void);
161 static StgTSO *threadStackOverflow(StgTSO *tso);
163 /* KH: The following two flags are shared memory locations. There is no need
164 to lock them, since they are only unset at the end of a scheduler
168 /* flag set by signal handler to precipitate a context switch */
169 //@cindex context_switch
172 /* if this flag is set as well, give up execution */
173 //@cindex interrupted
176 /* Next thread ID to allocate.
177 * Locks required: sched_mutex
179 //@cindex next_thread_id
180 StgThreadID next_thread_id = 1;
183 * Pointers to the state of the current thread.
184 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
185 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
188 /* The smallest stack size that makes any sense is:
189 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
190 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
191 * + 1 (the realworld token for an IO thread)
192 * + 1 (the closure to enter)
194 * A thread with this stack will bomb immediately with a stack
195 * overflow, which will increase its stack size.
198 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
200 /* Free capability list.
201 * Locks required: sched_mutex.
204 //@cindex free_capabilities
205 //@cindex n_free_capabilities
206 Capability *free_capabilities; /* Available capabilities for running threads */
207 nat n_free_capabilities; /* total number of available capabilities */
209 //@cindex MainRegTable
210 Capability MainRegTable; /* for non-SMP, we have one global capability */
219 /* All our current task ids, saved in case we need to kill them later.
226 void addToBlockedQueue ( StgTSO *tso );
228 static void schedule ( void );
229 void interruptStgRts ( void );
231 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
233 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
237 static void sched_belch(char *s, ...);
241 //@cindex sched_mutex
243 //@cindex thread_ready_cond
244 //@cindex gc_pending_cond
245 pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
246 pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
247 pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
248 pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
255 rtsTime TimeOfLastYield;
259 char *whatNext_strs[] = {
267 char *threadReturnCode_strs[] = {
268 "HeapOverflow", /* might also be StackOverflow */
277 * The thread state for the main thread.
278 // ToDo: check whether not needed any more
282 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
283 //@subsection Main scheduling loop
285 /* ---------------------------------------------------------------------------
286 Main scheduling loop.
288 We use round-robin scheduling, each thread returning to the
289 scheduler loop when one of these conditions is detected:
292 * timer expires (thread yields)
297 Locking notes: we acquire the scheduler lock once at the beginning
298 of the scheduler loop, and release it when
300 * running a thread, or
301 * waiting for work, or
302 * waiting for a GC to complete.
305 In a GranSim setup this loop iterates over the global event queue.
306 This revolves around the global event queue, which determines what
307 to do next. Therefore, it's more complicated than either the
308 concurrent or the parallel (GUM) setup.
311 GUM iterates over incoming messages.
312 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
313 and sends out a fish whenever it has nothing to do; in-between
314 doing the actual reductions (shared code below) it processes the
315 incoming messages and deals with delayed operations
316 (see PendingFetches).
317 This is not the ugliest code you could imagine, but it's bloody close.
319 ------------------------------------------------------------------------ */
326 StgThreadReturnCode ret;
335 rtsBool was_interrupted = rtsFalse;
337 ACQUIRE_LOCK(&sched_mutex);
341 /* set up first event to get things going */
342 /* ToDo: assign costs for system setup and init MainTSO ! */
343 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
345 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
348 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
349 G_TSO(CurrentTSO, 5));
351 if (RtsFlags.GranFlags.Light) {
352 /* Save current time; GranSim Light only */
353 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
356 event = get_next_event();
358 while (event!=(rtsEvent*)NULL) {
359 /* Choose the processor with the next event */
360 CurrentProc = event->proc;
361 CurrentTSO = event->tso;
365 while (!GlobalStopPending) { /* GlobalStopPending set in par_exit */
373 IF_DEBUG(scheduler, printAllThreads());
375 /* If we're interrupted (the user pressed ^C, or some other
376 * termination condition occurred), kill all the currently running
380 IF_DEBUG(scheduler, sched_belch("interrupted"));
381 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
384 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
387 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
388 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
389 interrupted = rtsFalse;
390 was_interrupted = rtsTrue;
393 /* Go through the list of main threads and wake up any
394 * clients whose computations have finished. ToDo: this
395 * should be done more efficiently without a linear scan
396 * of the main threads list, somehow...
400 StgMainThread *m, **prev;
401 prev = &main_threads;
402 for (m = main_threads; m != NULL; m = m->link) {
403 switch (m->tso->what_next) {
406 *(m->ret) = (StgClosure *)m->tso->sp[0];
410 pthread_cond_broadcast(&m->wakeup);
414 if (was_interrupted) {
415 m->stat = Interrupted;
419 pthread_cond_broadcast(&m->wakeup);
429 /* in GUM do this only on the Main PE */
432 /* If our main thread has finished or been killed, return.
435 StgMainThread *m = main_threads;
436 if (m->tso->what_next == ThreadComplete
437 || m->tso->what_next == ThreadKilled) {
438 main_threads = main_threads->link;
439 if (m->tso->what_next == ThreadComplete) {
440 /* we finished successfully, fill in the return value */
441 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
445 if (was_interrupted) {
446 m->stat = Interrupted;
456 /* Top up the run queue from our spark pool. We try to make the
457 * number of threads in the run queue equal to the number of
462 nat n = n_free_capabilities;
463 StgTSO *tso = run_queue_hd;
465 /* Count the run queue */
466 while (n > 0 && tso != END_TSO_QUEUE) {
475 break; /* no more sparks in the pool */
477 /* I'd prefer this to be done in activateSpark -- HWL */
478 /* tricky - it needs to hold the scheduler lock and
479 * not try to re-acquire it -- SDM */
481 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
482 pushClosure(tso,spark);
483 PUSH_ON_RUN_QUEUE(tso);
485 advisory_thread_count++;
489 sched_belch("turning spark of closure %p into a thread",
490 (StgClosure *)spark));
493 /* We need to wake up the other tasks if we just created some
496 if (n_free_capabilities - n > 1) {
497 pthread_cond_signal(&thread_ready_cond);
502 /* Check whether any waiting threads need to be woken up. If the
503 * run queue is empty, and there are no other tasks running, we
504 * can wait indefinitely for something to happen.
505 * ToDo: what if another client comes along & requests another
508 if (blocked_queue_hd != END_TSO_QUEUE) {
510 (run_queue_hd == END_TSO_QUEUE)
512 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
517 /* check for signals each time around the scheduler */
519 if (signals_pending()) {
520 start_signal_handlers();
524 /* Detect deadlock: when we have no threads to run, there are
525 * no threads waiting on I/O or sleeping, and all the other
526 * tasks are waiting for work, we must have a deadlock. Inform
527 * all the main threads.
530 if (blocked_queue_hd == END_TSO_QUEUE
531 && run_queue_hd == END_TSO_QUEUE
532 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
535 for (m = main_threads; m != NULL; m = m->link) {
538 pthread_cond_broadcast(&m->wakeup);
544 In GUM all non-main PEs come in here with no work;
545 we ignore multiple main threads for now
547 if (blocked_queue_hd == END_TSO_QUEUE
548 && run_queue_hd == END_TSO_QUEUE) {
549 StgMainThread *m = main_threads;
552 main_threads = m->link;
559 /* If there's a GC pending, don't do anything until it has
563 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
564 pthread_cond_wait(&gc_pending_cond, &sched_mutex);
567 /* block until we've got a thread on the run queue and a free
570 while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
571 IF_DEBUG(scheduler, sched_belch("waiting for work"));
572 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
573 IF_DEBUG(scheduler, sched_belch("work now available"));
579 if (RtsFlags.GranFlags.Light)
580 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
582 /* adjust time based on time-stamp */
583 if (event->time > CurrentTime[CurrentProc] &&
584 event->evttype != ContinueThread)
585 CurrentTime[CurrentProc] = event->time;
587 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
588 if (!RtsFlags.GranFlags.Light)
591 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
593 /* main event dispatcher in GranSim */
594 switch (event->evttype) {
595 /* Should just be continuing execution */
597 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
598 /* ToDo: check assertion
599 ASSERT(run_queue_hd != (StgTSO*)NULL &&
600 run_queue_hd != END_TSO_QUEUE);
602 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
603 if (!RtsFlags.GranFlags.DoAsyncFetch &&
604 procStatus[CurrentProc]==Fetching) {
605 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
606 CurrentTSO->id, CurrentTSO, CurrentProc);
609 /* Ignore ContinueThreads for completed threads */
610 if (CurrentTSO->what_next == ThreadComplete) {
611 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
612 CurrentTSO->id, CurrentTSO, CurrentProc);
615 /* Ignore ContinueThreads for threads that are being migrated */
616 if (PROCS(CurrentTSO)==Nowhere) {
617 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
618 CurrentTSO->id, CurrentTSO, CurrentProc);
621 /* The thread should be at the beginning of the run queue */
622 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
623 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
624 CurrentTSO->id, CurrentTSO, CurrentProc);
625 break; // run the thread anyway
628 new_event(proc, proc, CurrentTime[proc],
630 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
632 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
633 break; // now actually run the thread; DaH Qu'vam yImuHbej
636 do_the_fetchnode(event);
637 goto next_thread; /* handle next event in event queue */
640 do_the_globalblock(event);
641 goto next_thread; /* handle next event in event queue */
644 do_the_fetchreply(event);
645 goto next_thread; /* handle next event in event queue */
647 case UnblockThread: /* Move from the blocked queue to the tail of */
648 do_the_unblock(event);
649 goto next_thread; /* handle next event in event queue */
651 case ResumeThread: /* Move from the blocked queue to the tail of */
652 /* the runnable queue ( i.e. Qu' SImqa'lu') */
653 event->tso->gran.blocktime +=
654 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
655 do_the_startthread(event);
656 goto next_thread; /* handle next event in event queue */
659 do_the_startthread(event);
660 goto next_thread; /* handle next event in event queue */
663 do_the_movethread(event);
664 goto next_thread; /* handle next event in event queue */
667 do_the_movespark(event);
668 goto next_thread; /* handle next event in event queue */
671 do_the_findwork(event);
672 goto next_thread; /* handle next event in event queue */
675 barf("Illegal event type %u\n", event->evttype);
678 /* This point was scheduler_loop in the old RTS */
680 IF_DEBUG(gran, belch("GRAN: after main switch"));
682 TimeOfLastEvent = CurrentTime[CurrentProc];
683 TimeOfNextEvent = get_time_of_next_event();
684 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
685 // CurrentTSO = ThreadQueueHd;
687 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
690 if (RtsFlags.GranFlags.Light)
691 GranSimLight_leave_system(event, &ActiveTSO);
693 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
696 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
698 /* in a GranSim setup the TSO stays on the run queue */
700 /* Take a thread from the run queue. */
701 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
704 fprintf(stderr, "GRAN: About to run current thread, which is\n");
707 context_switch = 0; // turned on via GranYield, checking events and time slice
710 DumpGranEvent(GR_SCHEDULE, t));
712 procStatus[CurrentProc] = Busy;
716 if (PendingFetches != END_BF_QUEUE) {
720 /* ToDo: phps merge with spark activation above */
721 /* check whether we have local work and send requests if we have none */
722 if (run_queue_hd == END_TSO_QUEUE) { /* no runnable threads */
723 /* :-[ no local threads => look out for local sparks */
724 /* the spark pool for the current PE */
725 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
726 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
727 pool->hd < pool->tl) {
729 * ToDo: add GC code check that we really have enough heap afterwards!!
731 * If we're here (no runnable threads) and we have pending
732 * sparks, we must have a space problem. Get enough space
733 * to turn one of those pending sparks into a
737 spark = findSpark(); /* get a spark */
738 if (spark != (rtsSpark) NULL) {
739 tso = activateSpark(spark); /* turn the spark into a thread */
740 IF_PAR_DEBUG(schedule,
741 belch("==== schedule: Created TSO %d (%p); %d threads active",
742 tso->id, tso, advisory_thread_count));
744 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
745 belch("==^^ failed to activate spark");
747 } /* otherwise fall through & pick-up new tso */
749 IF_PAR_DEBUG(verbose,
750 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
751 spark_queue_len(pool)));
755 /* =8-[ no local sparks => look for work on other PEs */
758 * We really have absolutely no work. Send out a fish
759 * (there may be some out there already), and wait for
760 * something to arrive. We clearly can't run any threads
761 * until a SCHEDULE or RESUME arrives, and so that's what
762 * we're hoping to see. (Of course, we still have to
763 * respond to other types of messages.)
766 outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
767 // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
768 /* fishing set in sendFish, processFish;
769 avoid flooding system with fishes via delay */
771 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
779 } else if (PacketsWaiting()) { /* Look for incoming messages */
783 /* Now we are sure that we have some work available */
784 ASSERT(run_queue_hd != END_TSO_QUEUE);
785 /* Take a thread from the run queue, if we have work */
786 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
788 /* ToDo: write something to the log-file
789 if (RTSflags.ParFlags.granSimStats && !sameThread)
790 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
794 /* the spark pool for the current PE */
795 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
797 IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)",
798 spark_queue_len(pool),
800 pool->hd, pool->tl, pool->base, pool->lim));
802 IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)",
803 run_queue_len(), CURRENT_PROC,
804 run_queue_hd, run_queue_tl));
809 we are running a different TSO, so write a schedule event to log file
810 NB: If we use fair scheduling we also have to write a deschedule
811 event for LastTSO; with unfair scheduling we know that the
812 previous tso has blocked whenever we switch to another tso, so
813 we don't need it in GUM for now
815 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
816 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
820 #else /* !GRAN && !PAR */
822 /* grab a thread from the run queue
825 IF_DEBUG(sanity,checkTSO(t));
832 cap = free_capabilities;
833 free_capabilities = cap->link;
834 n_free_capabilities--;
839 cap->rCurrentTSO = t;
841 /* set the context_switch flag
843 if (run_queue_hd == END_TSO_QUEUE)
848 RELEASE_LOCK(&sched_mutex);
850 #if defined(GRAN) || defined(PAR)
851 IF_DEBUG(scheduler, belch("-->> Running TSO %ld (%p) %s ...",
852 t->id, t, whatNext_strs[t->what_next]));
854 IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
857 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
858 /* Run the current thread
860 switch (cap->rCurrentTSO->what_next) {
863 /* Thread already finished, return to scheduler. */
864 ret = ThreadFinished;
867 ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
870 ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
872 case ThreadEnterHugs:
876 IF_DEBUG(scheduler,sched_belch("entering Hugs"));
877 c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
878 cap->rCurrentTSO->sp += 1;
883 barf("Panic: entered a BCO but no bytecode interpreter in this build");
886 barf("schedule: invalid what_next field");
888 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
890 /* Costs for the scheduler are assigned to CCS_SYSTEM */
895 ACQUIRE_LOCK(&sched_mutex);
898 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
899 #elif !defined(GRAN) && !defined(PAR)
900 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
902 t = cap->rCurrentTSO;
905 /* HACK 675: if the last thread didn't yield, make sure to print a
906 SCHEDULE event to the log file when StgRunning the next thread, even
907 if it is the same one as before */
908 LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t;
909 TimeOfLastYield = CURRENT_TIME;
914 /* make all the running tasks block on a condition variable,
915 * maybe set context_switch and wait till they all pile in,
916 * then have them wait on a GC condition variable.
918 #if defined(GRAN) || defined(PAR)
919 IF_DEBUG(scheduler,belch("--<< TSO %ld (%p; %s) stopped: HeapOverflow",
920 t->id, t, whatNext_strs[t->what_next]));
924 ASSERT(!is_on_queue(t,CurrentProc));
927 ready_to_gc = rtsTrue;
928 context_switch = 1; /* stop other threads ASAP */
929 PUSH_ON_RUN_QUEUE(t);
930 /* actual GC is done at the end of the while loop */
934 #if defined(GRAN) || defined(PAR)
935 IF_DEBUG(scheduler,belch("--<< TSO %ld (%p; %s) stopped, StackOverflow",
936 t->id, t, whatNext_strs[t->what_next]));
938 /* just adjust the stack for this thread, then pop it back
944 /* enlarge the stack */
945 StgTSO *new_t = threadStackOverflow(t);
947 /* This TSO has moved, so update any pointers to it from the
948 * main thread stack. It better not be on any other queues...
951 for (m = main_threads; m != NULL; m = m->link) {
957 PUSH_ON_RUN_QUEUE(new_t);
964 DumpGranEvent(GR_DESCHEDULE, t));
965 globalGranStats.tot_yields++;
968 DumpGranEvent(GR_DESCHEDULE, t));
970 /* put the thread back on the run queue. Then, if we're ready to
971 * GC, check whether this is the last task to stop. If so, wake
972 * up the GC thread. getThread will block during a GC until the
975 #if defined(GRAN) || defined(PAR)
977 if (t->what_next == ThreadEnterHugs) {
978 /* ToDo: or maybe a timer expired when we were in Hugs?
979 * or maybe someone hit ctrl-C
981 belch("--<< TSO %ld (%p; %s) stopped to switch to Hugs",
982 t->id, t, whatNext_strs[t->what_next]);
984 belch("--<< TSO %ld (%p; %s) stopped, yielding",
985 t->id, t, whatNext_strs[t->what_next]);
990 if (t->what_next == ThreadEnterHugs) {
991 /* ToDo: or maybe a timer expired when we were in Hugs?
992 * or maybe someone hit ctrl-C
994 belch("thread %ld stopped to switch to Hugs", t->id);
996 belch("thread %ld stopped, yielding", t->id);
1002 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1004 ASSERT(t->link == END_TSO_QUEUE);
1006 ASSERT(!is_on_queue(t,CurrentProc));
1009 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1010 checkThreadQsSanity(rtsTrue));
1012 APPEND_TO_RUN_QUEUE(t);
1014 /* add a ContinueThread event to actually process the thread */
1015 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1017 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1019 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1028 belch("--<< TSO %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1029 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1030 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1032 // ??? needed; should emit block before
1034 DumpGranEvent(GR_DESCHEDULE, t));
1035 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1038 ASSERT(procStatus[CurrentProc]==Busy ||
1039 ((procStatus[CurrentProc]==Fetching) &&
1040 (t->block_info.closure!=(StgClosure*)NULL)));
1041 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1042 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1043 procStatus[CurrentProc]==Fetching))
1044 procStatus[CurrentProc] = Idle;
1048 DumpGranEvent(GR_DESCHEDULE, t));
1050 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1054 belch("--<< TSO %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1055 t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1056 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1059 /* don't need to do anything. Either the thread is blocked on
1060 * I/O, in which case we'll have called addToBlockedQueue
1061 * previously, or it's blocked on an MVar or Blackhole, in which
1062 * case it'll be on the relevant queue already.
1065 fprintf(stderr, "--<< TSO %d (%p) stopped ", t->id, t);
1066 printThreadBlockage(t);
1067 fprintf(stderr, "\n"));
1069 /* Only for dumping event to log file
1070 ToDo: do I need this in GranSim, too?
1077 case ThreadFinished:
1078 /* Need to check whether this was a main thread, and if so, signal
1079 * the task that started it with the return value. If we have no
1080 * more main threads, we probably need to stop all the tasks until
1083 IF_DEBUG(scheduler,belch("--++ TSO %d (%p) finished", t->id, t));
1084 t->what_next = ThreadComplete;
1086 endThread(t, CurrentProc); // clean-up the thread
1088 advisory_thread_count--;
1089 if (RtsFlags.ParFlags.ParStats.Full)
1090 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1095 barf("doneThread: invalid thread return code");
1099 cap->link = free_capabilities;
1100 free_capabilities = cap;
1101 n_free_capabilities++;
1105 if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
1110 /* everybody back, start the GC.
1111 * Could do it in this thread, or signal a condition var
1112 * to do it in another thread. Either way, we need to
1113 * broadcast on gc_pending_cond afterward.
1116 IF_DEBUG(scheduler,sched_belch("doing GC"));
1118 GarbageCollect(GetRoots);
1119 ready_to_gc = rtsFalse;
1121 pthread_cond_broadcast(&gc_pending_cond);
1124 /* add a ContinueThread event to continue execution of current thread */
1125 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1127 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1129 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1136 IF_GRAN_DEBUG(unused,
1137 print_eventq(EventHd));
1139 event = get_next_event();
1143 /* ToDo: wait for next message to arrive rather than busy wait */
1148 t = take_off_run_queue(END_TSO_QUEUE);
1151 } /* end of while(1) */
1154 /* A hack for Hugs concurrency support. Needs sanitisation (?) */
1155 void deleteAllThreads ( void )
1158 IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
1159 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1162 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1165 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1166 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1169 /* startThread and insertThread are now in GranSim.c -- HWL */
1171 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1172 //@subsection Suspend and Resume
1174 /* ---------------------------------------------------------------------------
1175 * Suspending & resuming Haskell threads.
1177 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1178 * its capability before calling the C function. This allows another
1179 * task to pick up the capability and carry on running Haskell
1180 * threads. It also means that if the C call blocks, it won't lock
1183 * The Haskell thread making the C call is put to sleep for the
1184 * duration of the call, on the susepended_ccalling_threads queue. We
1185 * give out a token to the task, which it can use to resume the thread
1186 * on return from the C function.
1187 * ------------------------------------------------------------------------- */
1190 suspendThread( Capability *cap )
1194 ACQUIRE_LOCK(&sched_mutex);
1197 sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
1199 threadPaused(cap->rCurrentTSO);
1200 cap->rCurrentTSO->link = suspended_ccalling_threads;
1201 suspended_ccalling_threads = cap->rCurrentTSO;
1203 /* Use the thread ID as the token; it should be unique */
1204 tok = cap->rCurrentTSO->id;
1207 cap->link = free_capabilities;
1208 free_capabilities = cap;
1209 n_free_capabilities++;
1212 RELEASE_LOCK(&sched_mutex);
1217 resumeThread( StgInt tok )
1219 StgTSO *tso, **prev;
1222 ACQUIRE_LOCK(&sched_mutex);
1224 prev = &suspended_ccalling_threads;
1225 for (tso = suspended_ccalling_threads;
1226 tso != END_TSO_QUEUE;
1227 prev = &tso->link, tso = tso->link) {
1228 if (tso->id == (StgThreadID)tok) {
1233 if (tso == END_TSO_QUEUE) {
1234 barf("resumeThread: thread not found");
1238 while (free_capabilities == NULL) {
1239 IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1240 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1241 IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1243 cap = free_capabilities;
1244 free_capabilities = cap->link;
1245 n_free_capabilities--;
1247 cap = &MainRegTable;
1250 cap->rCurrentTSO = tso;
1252 RELEASE_LOCK(&sched_mutex);
1257 /* ---------------------------------------------------------------------------
1259 * ------------------------------------------------------------------------ */
1260 static void unblockThread(StgTSO *tso);
1262 /* ---------------------------------------------------------------------------
1263 * Comparing Thread ids.
1265 * This is used from STG land in the implementation of the
1266 * instances of Eq/Ord for ThreadIds.
1267 * ------------------------------------------------------------------------ */
1269 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1271 StgThreadID id1 = tso1->id;
1272 StgThreadID id2 = tso2->id;
1274 if (id1 < id2) return (-1);
1275 if (id1 > id2) return 1;
1279 /* ---------------------------------------------------------------------------
1280 Create a new thread.
1282 The new thread starts with the given stack size. Before the
1283 scheduler can run, however, this thread needs to have a closure
1284 (and possibly some arguments) pushed on its stack. See
1285 pushClosure() in Schedule.h.
1287 createGenThread() and createIOThread() (in SchedAPI.h) are
1288 convenient packaged versions of this function.
1290 currently pri (priority) is only used in a GRAN setup -- HWL
1291 ------------------------------------------------------------------------ */
1292 //@cindex createThread
1294 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1296 createThread(nat stack_size, StgInt pri)
1298 return createThread_(stack_size, rtsFalse, pri);
1302 createThread_(nat size, rtsBool have_lock, StgInt pri)
1306 createThread(nat stack_size)
1308 return createThread_(stack_size, rtsFalse);
1312 createThread_(nat size, rtsBool have_lock)
1319 /* First check whether we should create a thread at all */
1321 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1322 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1324 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1325 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1326 return END_TSO_QUEUE;
1332 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1335 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1337 /* catch ridiculously small stack sizes */
1338 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1339 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1342 stack_size = size - TSO_STRUCT_SIZEW;
1344 tso = (StgTSO *)allocate(size);
1345 TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1347 SET_HDR(tso, &TSO_info, CCS_MAIN);
1349 SET_GRAN_HDR(tso, ThisPE);
1351 tso->what_next = ThreadEnterGHC;
1353 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1354 * protect the increment operation on next_thread_id.
1355 * In future, we could use an atomic increment instead.
1357 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1358 tso->id = next_thread_id++;
1359 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1361 tso->why_blocked = NotBlocked;
1362 tso->blocked_exceptions = NULL;
1364 tso->splim = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1365 tso->stack_size = stack_size;
1366 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1368 tso->sp = (P_)&(tso->stack) + stack_size;
1371 tso->prof.CCCS = CCS_MAIN;
1374 /* put a stop frame on the stack */
1375 tso->sp -= sizeofW(StgStopFrame);
1376 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
1377 tso->su = (StgUpdateFrame*)tso->sp;
1379 IF_DEBUG(scheduler,belch("---- Initialised TSO %ld (%p), stack size = %lx words",
1380 tso->id, tso, tso->stack_size));
1384 tso->link = END_TSO_QUEUE;
1385 /* uses more flexible routine in GranSim */
1386 insertThread(tso, CurrentProc);
1388 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1393 #if defined(GRAN) || defined(PAR)
1394 DumpGranEvent(GR_START,tso);
1397 /* Link the new thread on the global thread list.
1399 tso->global_link = all_threads;
1403 tso->gran.pri = pri;
1405 tso->gran.magic = TSO_MAGIC; // debugging only
1407 tso->gran.sparkname = 0;
1408 tso->gran.startedat = CURRENT_TIME;
1409 tso->gran.exported = 0;
1410 tso->gran.basicblocks = 0;
1411 tso->gran.allocs = 0;
1412 tso->gran.exectime = 0;
1413 tso->gran.fetchtime = 0;
1414 tso->gran.fetchcount = 0;
1415 tso->gran.blocktime = 0;
1416 tso->gran.blockcount = 0;
1417 tso->gran.blockedat = 0;
1418 tso->gran.globalsparks = 0;
1419 tso->gran.localsparks = 0;
1420 if (RtsFlags.GranFlags.Light)
1421 tso->gran.clock = Now; /* local clock */
1423 tso->gran.clock = 0;
1425 IF_DEBUG(gran,printTSO(tso));
1428 tso->par.magic = TSO_MAGIC; // debugging only
1430 tso->par.sparkname = 0;
1431 tso->par.startedat = CURRENT_TIME;
1432 tso->par.exported = 0;
1433 tso->par.basicblocks = 0;
1434 tso->par.allocs = 0;
1435 tso->par.exectime = 0;
1436 tso->par.fetchtime = 0;
1437 tso->par.fetchcount = 0;
1438 tso->par.blocktime = 0;
1439 tso->par.blockcount = 0;
1440 tso->par.blockedat = 0;
1441 tso->par.globalsparks = 0;
1442 tso->par.localsparks = 0;
1446 globalGranStats.tot_threads_created++;
1447 globalGranStats.threads_created_on_PE[CurrentProc]++;
1448 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1449 globalGranStats.tot_sq_probes++;
1454 belch("==__ schedule: Created TSO %d (%p);",
1455 CurrentProc, tso, tso->id));
1457 IF_PAR_DEBUG(verbose,
1458 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1459 tso->id, tso, advisory_thread_count));
1461 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1462 tso->id, tso->stack_size));
1468 Turn a spark into a thread.
1469 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1472 //@cindex activateSpark
1474 activateSpark (rtsSpark spark)
1478 ASSERT(spark != (rtsSpark)NULL);
1479 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1480 if (tso!=END_TSO_QUEUE) {
1481 pushClosure(tso,spark);
1482 PUSH_ON_RUN_QUEUE(tso);
1483 advisory_thread_count++;
1485 if (RtsFlags.ParFlags.ParStats.Full) {
1486 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1487 IF_PAR_DEBUG(verbose,
1488 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1489 (StgClosure *)spark, info_type((StgClosure *)spark)));
1492 barf("activateSpark: Cannot create TSO");
1494 // ToDo: fwd info on local/global spark to thread -- HWL
1495 // tso->gran.exported = spark->exported;
1496 // tso->gran.locked = !spark->global;
1497 // tso->gran.sparkname = spark->name;
1503 /* ---------------------------------------------------------------------------
1506 * scheduleThread puts a thread on the head of the runnable queue.
1507 * This will usually be done immediately after a thread is created.
1508 * The caller of scheduleThread must create the thread using e.g.
1509 * createThread and push an appropriate closure
1510 * on this thread's stack before the scheduler is invoked.
1511 * ------------------------------------------------------------------------ */
1514 scheduleThread(StgTSO *tso)
1516 if (tso==END_TSO_QUEUE){
1521 ACQUIRE_LOCK(&sched_mutex);
1523 /* Put the new thread on the head of the runnable queue. The caller
1524 * better push an appropriate closure on this thread's stack
1525 * beforehand. In the SMP case, the thread may start running as
1526 * soon as we release the scheduler lock below.
1528 PUSH_ON_RUN_QUEUE(tso);
1531 IF_DEBUG(scheduler,printTSO(tso));
1532 RELEASE_LOCK(&sched_mutex);
1535 /* ---------------------------------------------------------------------------
1538 * Start up Posix threads to run each of the scheduler tasks.
1539 * I believe the task ids are not needed in the system as defined.
1541 * ------------------------------------------------------------------------ */
1543 #if defined(PAR) || defined(SMP)
1545 taskStart( void *arg STG_UNUSED )
1547 rts_evalNothing(NULL);
1551 /* ---------------------------------------------------------------------------
1554 * Initialise the scheduler. This resets all the queues - if the
1555 * queues contained any threads, they'll be garbage collected at the
1558 * This now calls startTasks(), so should only be called once! KH @ 25/10/99
1559 * ------------------------------------------------------------------------ */
1563 term_handler(int sig STG_UNUSED)
1566 ACQUIRE_LOCK(&term_mutex);
1568 RELEASE_LOCK(&term_mutex);
1573 //@cindex initScheduler
1580 for (i=0; i<=MAX_PROC; i++) {
1581 run_queue_hds[i] = END_TSO_QUEUE;
1582 run_queue_tls[i] = END_TSO_QUEUE;
1583 blocked_queue_hds[i] = END_TSO_QUEUE;
1584 blocked_queue_tls[i] = END_TSO_QUEUE;
1585 ccalling_threadss[i] = END_TSO_QUEUE;
1588 run_queue_hd = END_TSO_QUEUE;
1589 run_queue_tl = END_TSO_QUEUE;
1590 blocked_queue_hd = END_TSO_QUEUE;
1591 blocked_queue_tl = END_TSO_QUEUE;
1594 suspended_ccalling_threads = END_TSO_QUEUE;
1596 main_threads = NULL;
1597 all_threads = END_TSO_QUEUE;
1602 enteredCAFs = END_CAF_LIST;
1604 /* Install the SIGHUP handler */
1607 struct sigaction action,oact;
1609 action.sa_handler = term_handler;
1610 sigemptyset(&action.sa_mask);
1611 action.sa_flags = 0;
1612 if (sigaction(SIGTERM, &action, &oact) != 0) {
1613 barf("can't install TERM handler");
1619 /* Allocate N Capabilities */
1622 Capability *cap, *prev;
1625 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1626 cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1630 free_capabilities = cap;
1631 n_free_capabilities = RtsFlags.ParFlags.nNodes;
1633 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1634 n_free_capabilities););
1637 #if defined(SMP) || defined(PAR)
1650 /* make some space for saving all the thread ids */
1651 task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1652 "initScheduler:task_ids");
1654 /* and create all the threads */
1655 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1656 r = pthread_create(&tid,NULL,taskStart,NULL);
1658 barf("startTasks: Can't create new Posix thread");
1660 task_ids[i].id = tid;
1661 task_ids[i].mut_time = 0.0;
1662 task_ids[i].mut_etime = 0.0;
1663 task_ids[i].gc_time = 0.0;
1664 task_ids[i].gc_etime = 0.0;
1665 task_ids[i].elapsedtimestart = elapsedtime();
1666 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1672 exitScheduler( void )
1677 /* Don't want to use pthread_cancel, since we'd have to install
1678 * these silly exception handlers (pthread_cleanup_{push,pop}) around
1682 /* Cancel all our tasks */
1683 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1684 pthread_cancel(task_ids[i].id);
1687 /* Wait for all the tasks to terminate */
1688 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1689 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n",
1691 pthread_join(task_ids[i].id, NULL);
1695 /* Send 'em all a SIGHUP. That should shut 'em up.
1697 await_death = RtsFlags.ParFlags.nNodes;
1698 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1699 pthread_kill(task_ids[i].id,SIGTERM);
1701 while (await_death > 0) {
1707 /* -----------------------------------------------------------------------------
1708 Managing the per-task allocation areas.
1710 Each capability comes with an allocation area. These are
1711 fixed-length block lists into which allocation can be done.
1713 ToDo: no support for two-space collection at the moment???
1714 -------------------------------------------------------------------------- */
1716 /* -----------------------------------------------------------------------------
1717 * waitThread is the external interface for running a new computation
1718 * and waiting for the result.
1720 * In the non-SMP case, we create a new main thread, push it on the
1721 * main-thread stack, and invoke the scheduler to run it. The
1722 * scheduler will return when the top main thread on the stack has
1723 * completed or died, and fill in the necessary fields of the
1724 * main_thread structure.
1726 * In the SMP case, we create a main thread as before, but we then
1727 * create a new condition variable and sleep on it. When our new
1728 * main thread has completed, we'll be woken up and the status/result
1729 * will be in the main_thread struct.
1730 * -------------------------------------------------------------------------- */
1733 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1736 SchedulerStatus stat;
1738 ACQUIRE_LOCK(&sched_mutex);
1740 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1746 pthread_cond_init(&m->wakeup, NULL);
1749 m->link = main_threads;
1752 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n",
1757 pthread_cond_wait(&m->wakeup, &sched_mutex);
1758 } while (m->stat == NoStatus);
1760 /* GranSim specific init */
1761 CurrentTSO = m->tso; // the TSO to run
1762 procStatus[MainProc] = Busy; // status of main PE
1763 CurrentProc = MainProc; // PE to run it on
1768 ASSERT(m->stat != NoStatus);
1774 pthread_cond_destroy(&m->wakeup);
1777 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n",
1781 RELEASE_LOCK(&sched_mutex);
1786 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1787 //@subsection Run queue code
1791 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1792 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1793 implicit global variable that has to be correct when calling these
1797 /* Put the new thread on the head of the runnable queue.
1798 * The caller of createThread better push an appropriate closure
1799 * on this thread's stack before the scheduler is invoked.
1801 static /* inline */ void
1802 add_to_run_queue(tso)
1805 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1806 tso->link = run_queue_hd;
1808 if (run_queue_tl == END_TSO_QUEUE) {
1813 /* Put the new thread at the end of the runnable queue. */
1814 static /* inline */ void
1815 push_on_run_queue(tso)
1818 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1819 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1820 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1821 if (run_queue_hd == END_TSO_QUEUE) {
1824 run_queue_tl->link = tso;
1830 Should be inlined because it's used very often in schedule. The tso
1831 argument is actually only needed in GranSim, where we want to have the
1832 possibility to schedule *any* TSO on the run queue, irrespective of the
1833 actual ordering. Therefore, if tso is not the nil TSO then we traverse
1834 the run queue and dequeue the tso, adjusting the links in the queue.
1836 //@cindex take_off_run_queue
1837 static /* inline */ StgTSO*
1838 take_off_run_queue(StgTSO *tso) {
1842 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1844 if tso is specified, unlink that tso from the run_queue (doesn't have
1845 to be at the beginning of the queue); GranSim only
1847 if (tso!=END_TSO_QUEUE) {
1848 /* find tso in queue */
1849 for (t=run_queue_hd, prev=END_TSO_QUEUE;
1850 t!=END_TSO_QUEUE && t!=tso;
1854 /* now actually dequeue the tso */
1855 if (prev!=END_TSO_QUEUE) {
1856 ASSERT(run_queue_hd!=t);
1857 prev->link = t->link;
1859 /* t is at beginning of thread queue */
1860 ASSERT(run_queue_hd==t);
1861 run_queue_hd = t->link;
1863 /* t is at end of thread queue */
1864 if (t->link==END_TSO_QUEUE) {
1865 ASSERT(t==run_queue_tl);
1866 run_queue_tl = prev;
1868 ASSERT(run_queue_tl!=t);
1870 t->link = END_TSO_QUEUE;
1872 /* take tso from the beginning of the queue; std concurrent code */
1874 if (t != END_TSO_QUEUE) {
1875 run_queue_hd = t->link;
1876 t->link = END_TSO_QUEUE;
1877 if (run_queue_hd == END_TSO_QUEUE) {
1878 run_queue_tl = END_TSO_QUEUE;
1887 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1888 //@subsection Garbage Collextion Routines
1890 /* ---------------------------------------------------------------------------
1891 Where are the roots that we know about?
1893 - all the threads on the runnable queue
1894 - all the threads on the blocked queue
1895 - all the thread currently executing a _ccall_GC
1896 - all the "main threads"
1898 ------------------------------------------------------------------------ */
1900 /* This has to be protected either by the scheduler monitor, or by the
1901 garbage collection monitor (probably the latter).
1905 static void GetRoots(void)
1912 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1913 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1914 run_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1915 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1916 run_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1918 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1919 blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1920 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1921 blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1922 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1923 ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1930 if (run_queue_hd != END_TSO_QUEUE) {
1931 ASSERT(run_queue_tl != END_TSO_QUEUE);
1932 run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1933 run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1936 if (blocked_queue_hd != END_TSO_QUEUE) {
1937 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1938 blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1939 blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1943 for (m = main_threads; m != NULL; m = m->link) {
1944 m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1946 if (suspended_ccalling_threads != END_TSO_QUEUE)
1947 suspended_ccalling_threads =
1948 (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1950 #if defined(SMP) || defined(PAR) || defined(GRAN)
1955 /* -----------------------------------------------------------------------------
1958 This is the interface to the garbage collector from Haskell land.
1959 We provide this so that external C code can allocate and garbage
1960 collect when called from Haskell via _ccall_GC.
1962 It might be useful to provide an interface whereby the programmer
1963 can specify more roots (ToDo).
1965 This needs to be protected by the GC condition variable above. KH.
1966 -------------------------------------------------------------------------- */
1968 void (*extra_roots)(void);
1973 GarbageCollect(GetRoots);
1979 GetRoots(); /* the scheduler's roots */
1980 extra_roots(); /* the user's roots */
1984 performGCWithRoots(void (*get_roots)(void))
1986 extra_roots = get_roots;
1988 GarbageCollect(AllRoots);
1991 /* -----------------------------------------------------------------------------
1994 If the thread has reached its maximum stack size, then raise the
1995 StackOverflow exception in the offending thread. Otherwise
1996 relocate the TSO into a larger chunk of memory and adjust its stack
1998 -------------------------------------------------------------------------- */
2001 threadStackOverflow(StgTSO *tso)
2003 nat new_stack_size, new_tso_size, diff, stack_words;
2007 IF_DEBUG(sanity,checkTSO(tso));
2008 if (tso->stack_size >= tso->max_stack_size) {
2011 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2012 tso->id, tso, tso->stack_size, tso->max_stack_size);
2013 /* If we're debugging, just print out the top of the stack */
2014 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2018 fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
2021 /* Send this thread the StackOverflow exception */
2022 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2027 /* Try to double the current stack size. If that takes us over the
2028 * maximum stack size for this thread, then use the maximum instead.
2029 * Finally round up so the TSO ends up as a whole number of blocks.
2031 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2032 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2033 TSO_STRUCT_SIZE)/sizeof(W_);
2034 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2035 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2037 IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2039 dest = (StgTSO *)allocate(new_tso_size);
2040 TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2042 /* copy the TSO block and the old stack into the new area */
2043 memcpy(dest,tso,TSO_STRUCT_SIZE);
2044 stack_words = tso->stack + tso->stack_size - tso->sp;
2045 new_sp = (P_)dest + new_tso_size - stack_words;
2046 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2048 /* relocate the stack pointers... */
2049 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2050 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2052 dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
2053 dest->stack_size = new_stack_size;
2055 /* and relocate the update frame list */
2056 relocate_TSO(tso, dest);
2058 /* Mark the old TSO as relocated. We have to check for relocated
2059 * TSOs in the garbage collector and any primops that deal with TSOs.
2061 * It's important to set the sp and su values to just beyond the end
2062 * of the stack, so we don't attempt to scavenge any part of the
2065 tso->what_next = ThreadRelocated;
2067 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2068 tso->su = (StgUpdateFrame *)tso->sp;
2069 tso->why_blocked = NotBlocked;
2070 dest->mut_link = NULL;
2072 IF_PAR_DEBUG(verbose,
2073 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2074 tso->id, tso, tso->stack_size);
2075 /* If we're debugging, just print out the top of the stack */
2076 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2079 IF_DEBUG(sanity,checkTSO(tso));
2081 IF_DEBUG(scheduler,printTSO(dest));
2087 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2088 //@subsection Blocking Queue Routines
2090 /* ---------------------------------------------------------------------------
2091 Wake up a queue that was blocked on some resource.
2092 ------------------------------------------------------------------------ */
2094 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
2098 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2103 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2105 /* write RESUME events to log file and
2106 update blocked and fetch time (depending on type of the orig closure) */
2107 if (RtsFlags.ParFlags.ParStats.Full) {
2108 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2109 GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2110 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2112 switch (get_itbl(node)->type) {
2114 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2119 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2122 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2129 static StgBlockingQueueElement *
2130 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2133 PEs node_loc, tso_loc;
2135 node_loc = where_is(node); // should be lifted out of loop
2136 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2137 tso_loc = where_is((StgClosure *)tso);
2138 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2139 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2140 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2141 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2142 // insertThread(tso, node_loc);
2143 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2145 tso, node, (rtsSpark*)NULL);
2146 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2149 } else { // TSO is remote (actually should be FMBQ)
2150 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2151 RtsFlags.GranFlags.Costs.gunblocktime +
2152 RtsFlags.GranFlags.Costs.latency;
2153 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2155 tso, node, (rtsSpark*)NULL);
2156 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2159 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2161 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2162 (node_loc==tso_loc ? "Local" : "Global"),
2163 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2164 tso->block_info.closure = NULL;
2165 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2169 static StgBlockingQueueElement *
2170 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2172 StgBlockingQueueElement *next;
2174 switch (get_itbl(bqe)->type) {
2176 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2177 /* if it's a TSO just push it onto the run_queue */
2179 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2180 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2182 unblockCount(bqe, node);
2183 /* reset blocking status after dumping event */
2184 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2188 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2190 bqe->link = PendingFetches;
2191 PendingFetches = bqe;
2195 /* can ignore this case in a non-debugging setup;
2196 see comments on RBHSave closures above */
2198 /* check that the closure is an RBHSave closure */
2199 ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
2200 get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
2201 get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
2205 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2206 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2210 // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2214 #else /* !GRAN && !PAR */
2216 unblockOneLocked(StgTSO *tso)
2220 ASSERT(get_itbl(tso)->type == TSO);
2221 ASSERT(tso->why_blocked != NotBlocked);
2222 tso->why_blocked = NotBlocked;
2224 PUSH_ON_RUN_QUEUE(tso);
2226 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2231 #if defined(GRAN) || defined(PAR)
2232 inline StgBlockingQueueElement *
2233 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2235 ACQUIRE_LOCK(&sched_mutex);
2236 bqe = unblockOneLocked(bqe, node);
2237 RELEASE_LOCK(&sched_mutex);
2242 unblockOne(StgTSO *tso)
2244 ACQUIRE_LOCK(&sched_mutex);
2245 tso = unblockOneLocked(tso);
2246 RELEASE_LOCK(&sched_mutex);
2253 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2255 StgBlockingQueueElement *bqe;
2260 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2261 node, CurrentProc, CurrentTime[CurrentProc],
2262 CurrentTSO->id, CurrentTSO));
2264 node_loc = where_is(node);
2266 ASSERT(get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2267 get_itbl(q)->type == CONSTR); // closure (type constructor)
2268 ASSERT(is_unique(node));
2270 /* FAKE FETCH: magically copy the node to the tso's proc;
2271 no Fetch necessary because in reality the node should not have been
2272 moved to the other PE in the first place
2274 if (CurrentProc!=node_loc) {
2276 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2277 node, node_loc, CurrentProc, CurrentTSO->id,
2278 // CurrentTSO, where_is(CurrentTSO),
2279 node->header.gran.procs));
2280 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2282 belch("## new bitmask of node %p is %#x",
2283 node, node->header.gran.procs));
2284 if (RtsFlags.GranFlags.GranSimStats.Global) {
2285 globalGranStats.tot_fake_fetches++;
2290 // ToDo: check: ASSERT(CurrentProc==node_loc);
2291 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2294 bqe points to the current element in the queue
2295 next points to the next element in the queue
2297 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2298 //tso_loc = where_is(tso);
2300 bqe = unblockOneLocked(bqe, node);
2303 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2304 the closure to make room for the anchor of the BQ */
2305 if (bqe!=END_BQ_QUEUE) {
2306 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2308 ASSERT((info_ptr==&RBH_Save_0_info) ||
2309 (info_ptr==&RBH_Save_1_info) ||
2310 (info_ptr==&RBH_Save_2_info));
2312 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2313 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2314 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2317 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2318 node, info_type(node)));
2321 /* statistics gathering */
2322 if (RtsFlags.GranFlags.GranSimStats.Global) {
2323 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2324 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2325 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2326 globalGranStats.tot_awbq++; // total no. of bqs awakened
2329 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2330 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2334 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2336 StgBlockingQueueElement *bqe, *next;
2338 ACQUIRE_LOCK(&sched_mutex);
2340 IF_PAR_DEBUG(verbose,
2341 belch("## AwBQ for node %p on [%x]: ",
2344 ASSERT(get_itbl(q)->type == TSO ||
2345 get_itbl(q)->type == BLOCKED_FETCH ||
2346 get_itbl(q)->type == CONSTR);
2349 while (get_itbl(bqe)->type==TSO ||
2350 get_itbl(bqe)->type==BLOCKED_FETCH) {
2351 bqe = unblockOneLocked(bqe, node);
2353 RELEASE_LOCK(&sched_mutex);
2356 #else /* !GRAN && !PAR */
2358 awakenBlockedQueue(StgTSO *tso)
2360 ACQUIRE_LOCK(&sched_mutex);
2361 while (tso != END_TSO_QUEUE) {
2362 tso = unblockOneLocked(tso);
2364 RELEASE_LOCK(&sched_mutex);
2368 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2369 //@subsection Exception Handling Routines
2371 /* ---------------------------------------------------------------------------
2373 - usually called inside a signal handler so it mustn't do anything fancy.
2374 ------------------------------------------------------------------------ */
2377 interruptStgRts(void)
2383 /* -----------------------------------------------------------------------------
2386 This is for use when we raise an exception in another thread, which
2388 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2389 -------------------------------------------------------------------------- */
2391 #if defined(GRAN) || defined(PAR)
2393 NB: only the type of the blocking queue is different in GranSim and GUM
2394 the operations on the queue-elements are the same
2395 long live polymorphism!
2398 unblockThread(StgTSO *tso)
2400 StgBlockingQueueElement *t, **last;
2402 ACQUIRE_LOCK(&sched_mutex);
2403 switch (tso->why_blocked) {
2406 return; /* not blocked */
2409 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2411 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2412 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2414 last = (StgBlockingQueueElement **)&mvar->head;
2415 for (t = (StgBlockingQueueElement *)mvar->head;
2417 last = &t->link, last_tso = t, t = t->link) {
2418 if (t == (StgBlockingQueueElement *)tso) {
2419 *last = (StgBlockingQueueElement *)tso->link;
2420 if (mvar->tail == tso) {
2421 mvar->tail = (StgTSO *)last_tso;
2426 barf("unblockThread (MVAR): TSO not found");
2429 case BlockedOnBlackHole:
2430 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2432 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2434 last = &bq->blocking_queue;
2435 for (t = bq->blocking_queue;
2437 last = &t->link, t = t->link) {
2438 if (t == (StgBlockingQueueElement *)tso) {
2439 *last = (StgBlockingQueueElement *)tso->link;
2443 barf("unblockThread (BLACKHOLE): TSO not found");
2446 case BlockedOnException:
2448 StgTSO *target = tso->block_info.tso;
2450 ASSERT(get_itbl(target)->type == TSO);
2451 ASSERT(target->blocked_exceptions != NULL);
2453 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2454 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2456 last = &t->link, t = t->link) {
2457 ASSERT(get_itbl(t)->type == TSO);
2458 if (t == (StgBlockingQueueElement *)tso) {
2459 *last = (StgBlockingQueueElement *)tso->link;
2463 barf("unblockThread (Exception): TSO not found");
2466 case BlockedOnDelay:
2468 case BlockedOnWrite:
2470 StgBlockingQueueElement *prev = NULL;
2471 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2472 prev = t, t = t->link) {
2473 if (t == (StgBlockingQueueElement *)tso) {
2475 blocked_queue_hd = (StgTSO *)t->link;
2476 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2477 blocked_queue_tl = END_TSO_QUEUE;
2480 prev->link = t->link;
2481 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2482 blocked_queue_tl = (StgTSO *)prev;
2488 barf("unblockThread (I/O): TSO not found");
2492 barf("unblockThread");
2496 tso->link = END_TSO_QUEUE;
2497 tso->why_blocked = NotBlocked;
2498 tso->block_info.closure = NULL;
2499 PUSH_ON_RUN_QUEUE(tso);
2500 RELEASE_LOCK(&sched_mutex);
2504 unblockThread(StgTSO *tso)
2508 ACQUIRE_LOCK(&sched_mutex);
2509 switch (tso->why_blocked) {
2512 return; /* not blocked */
2515 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2517 StgTSO *last_tso = END_TSO_QUEUE;
2518 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2521 for (t = mvar->head; t != END_TSO_QUEUE;
2522 last = &t->link, last_tso = t, t = t->link) {
2525 if (mvar->tail == tso) {
2526 mvar->tail = last_tso;
2531 barf("unblockThread (MVAR): TSO not found");
2534 case BlockedOnBlackHole:
2535 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2537 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2539 last = &bq->blocking_queue;
2540 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2541 last = &t->link, t = t->link) {
2547 barf("unblockThread (BLACKHOLE): TSO not found");
2550 case BlockedOnException:
2552 StgTSO *target = tso->block_info.tso;
2554 ASSERT(get_itbl(target)->type == TSO);
2555 ASSERT(target->blocked_exceptions != NULL);
2557 last = &target->blocked_exceptions;
2558 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2559 last = &t->link, t = t->link) {
2560 ASSERT(get_itbl(t)->type == TSO);
2566 barf("unblockThread (Exception): TSO not found");
2569 case BlockedOnDelay:
2571 case BlockedOnWrite:
2573 StgTSO *prev = NULL;
2574 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2575 prev = t, t = t->link) {
2578 blocked_queue_hd = t->link;
2579 if (blocked_queue_tl == t) {
2580 blocked_queue_tl = END_TSO_QUEUE;
2583 prev->link = t->link;
2584 if (blocked_queue_tl == t) {
2585 blocked_queue_tl = prev;
2591 barf("unblockThread (I/O): TSO not found");
2595 barf("unblockThread");
2599 tso->link = END_TSO_QUEUE;
2600 tso->why_blocked = NotBlocked;
2601 tso->block_info.closure = NULL;
2602 PUSH_ON_RUN_QUEUE(tso);
2603 RELEASE_LOCK(&sched_mutex);
2607 /* -----------------------------------------------------------------------------
2610 * The following function implements the magic for raising an
2611 * asynchronous exception in an existing thread.
2613 * We first remove the thread from any queue on which it might be
2614 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2616 * We strip the stack down to the innermost CATCH_FRAME, building
2617 * thunks in the heap for all the active computations, so they can
2618 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2619 * an application of the handler to the exception, and push it on
2620 * the top of the stack.
2622 * How exactly do we save all the active computations? We create an
2623 * AP_UPD for every UpdateFrame on the stack. Entering one of these
2624 * AP_UPDs pushes everything from the corresponding update frame
2625 * upwards onto the stack. (Actually, it pushes everything up to the
2626 * next update frame plus a pointer to the next AP_UPD object.
2627 * Entering the next AP_UPD object pushes more onto the stack until we
2628 * reach the last AP_UPD object - at which point the stack should look
2629 * exactly as it did when we killed the TSO and we can continue
2630 * execution by entering the closure on top of the stack.
2632 * We can also kill a thread entirely - this happens if either (a) the
2633 * exception passed to raiseAsync is NULL, or (b) there's no
2634 * CATCH_FRAME on the stack. In either case, we strip the entire
2635 * stack and replace the thread with a zombie.
2637 * -------------------------------------------------------------------------- */
2640 deleteThread(StgTSO *tso)
2642 raiseAsync(tso,NULL);
2646 raiseAsync(StgTSO *tso, StgClosure *exception)
2648 StgUpdateFrame* su = tso->su;
2649 StgPtr sp = tso->sp;
2651 /* Thread already dead? */
2652 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2656 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2658 /* Remove it from any blocking queues */
2661 /* The stack freezing code assumes there's a closure pointer on
2662 * the top of the stack. This isn't always the case with compiled
2663 * code, so we have to push a dummy closure on the top which just
2664 * returns to the next return address on the stack.
2666 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2667 *(--sp) = (W_)&dummy_ret_closure;
2671 int words = ((P_)su - (P_)sp) - 1;
2675 /* If we find a CATCH_FRAME, and we've got an exception to raise,
2676 * then build PAP(handler,exception,realworld#), and leave it on
2677 * top of the stack ready to enter.
2679 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2680 StgCatchFrame *cf = (StgCatchFrame *)su;
2681 /* we've got an exception to raise, so let's pass it to the
2682 * handler in this frame.
2684 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2685 TICK_ALLOC_UPD_PAP(3,0);
2686 SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2689 ap->fun = cf->handler; /* :: Exception -> IO a */
2690 ap->payload[0] = (P_)exception;
2691 ap->payload[1] = ARG_TAG(0); /* realworld token */
2693 /* throw away the stack from Sp up to and including the
2696 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
2699 /* Restore the blocked/unblocked state for asynchronous exceptions
2700 * at the CATCH_FRAME.
2702 * If exceptions were unblocked at the catch, arrange that they
2703 * are unblocked again after executing the handler by pushing an
2704 * unblockAsyncExceptions_ret stack frame.
2706 if (!cf->exceptions_blocked) {
2707 *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2710 /* Ensure that async exceptions are blocked when running the handler.
2712 if (tso->blocked_exceptions == NULL) {
2713 tso->blocked_exceptions = END_TSO_QUEUE;
2716 /* Put the newly-built PAP on top of the stack, ready to execute
2717 * when the thread restarts.
2721 tso->what_next = ThreadEnterGHC;
2725 /* First build an AP_UPD consisting of the stack chunk above the
2726 * current update frame, with the top word on the stack as the
2729 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2734 ap->fun = (StgClosure *)sp[0];
2736 for(i=0; i < (nat)words; ++i) {
2737 ap->payload[i] = (P_)*sp++;
2740 switch (get_itbl(su)->type) {
2744 SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */);
2745 TICK_ALLOC_UP_THK(words+1,0);
2748 fprintf(stderr, "scheduler: Updating ");
2749 printPtr((P_)su->updatee);
2750 fprintf(stderr, " with ");
2751 printObj((StgClosure *)ap);
2754 /* Replace the updatee with an indirection - happily
2755 * this will also wake up any threads currently
2756 * waiting on the result.
2758 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
2760 sp += sizeofW(StgUpdateFrame) -1;
2761 sp[0] = (W_)ap; /* push onto stack */
2767 StgCatchFrame *cf = (StgCatchFrame *)su;
2770 /* We want a PAP, not an AP_UPD. Fortunately, the
2771 * layout's the same.
2773 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2774 TICK_ALLOC_UPD_PAP(words+1,0);
2776 /* now build o = FUN(catch,ap,handler) */
2777 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2778 TICK_ALLOC_FUN(2,0);
2779 SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2780 o->payload[0] = (StgClosure *)ap;
2781 o->payload[1] = cf->handler;
2784 fprintf(stderr, "scheduler: Built ");
2785 printObj((StgClosure *)o);
2788 /* pop the old handler and put o on the stack */
2790 sp += sizeofW(StgCatchFrame) - 1;
2797 StgSeqFrame *sf = (StgSeqFrame *)su;
2800 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2801 TICK_ALLOC_UPD_PAP(words+1,0);
2803 /* now build o = FUN(seq,ap) */
2804 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2805 TICK_ALLOC_SE_THK(1,0);
2806 SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2807 o->payload[0] = (StgClosure *)ap;
2810 fprintf(stderr, "scheduler: Built ");
2811 printObj((StgClosure *)o);
2814 /* pop the old handler and put o on the stack */
2816 sp += sizeofW(StgSeqFrame) - 1;
2822 /* We've stripped the entire stack, the thread is now dead. */
2823 sp += sizeofW(StgStopFrame) - 1;
2824 sp[0] = (W_)exception; /* save the exception */
2825 tso->what_next = ThreadKilled;
2826 tso->su = (StgUpdateFrame *)(sp+1);
2837 /* -----------------------------------------------------------------------------
2838 resurrectThreads is called after garbage collection on the list of
2839 threads found to be garbage. Each of these threads will be woken
2840 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2841 on an MVar, or NonTermination if the thread was blocked on a Black
2843 -------------------------------------------------------------------------- */
2846 resurrectThreads( StgTSO *threads )
2850 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2851 next = tso->global_link;
2852 tso->global_link = all_threads;
2854 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2856 switch (tso->why_blocked) {
2858 case BlockedOnException:
2859 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2861 case BlockedOnBlackHole:
2862 raiseAsync(tso,(StgClosure *)NonTermination_closure);
2865 /* This might happen if the thread was blocked on a black hole
2866 * belonging to a thread that we've just woken up (raiseAsync
2867 * can wake up threads, remember...).
2871 barf("resurrectThreads: thread blocked in a strange way");
2876 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2877 //@subsection Debugging Routines
2879 /* -----------------------------------------------------------------------------
2880 Debugging: why is a thread blocked
2881 -------------------------------------------------------------------------- */
2886 printThreadBlockage(StgTSO *tso)
2888 switch (tso->why_blocked) {
2890 fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2892 case BlockedOnWrite:
2893 fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2895 case BlockedOnDelay:
2896 #if defined(HAVE_SETITIMER)
2897 fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2899 fprintf(stderr,"blocked on delay of %d ms",
2900 tso->block_info.target - getourtimeofday());
2904 fprintf(stderr,"blocked on an MVar");
2906 case BlockedOnException:
2907 fprintf(stderr,"blocked on delivering an exception to thread %d",
2908 tso->block_info.tso->id);
2910 case BlockedOnBlackHole:
2911 fprintf(stderr,"blocked on a black hole");
2914 fprintf(stderr,"not blocked");
2918 fprintf(stderr,"blocked on global address; local FM_BQ is %p (%s)",
2919 tso->block_info.closure, info_type(tso->block_info.closure));
2921 case BlockedOnGA_NoSend:
2922 fprintf(stderr,"blocked on global address (no send); local FM_BQ is %p (%s)",
2923 tso->block_info.closure, info_type(tso->block_info.closure));
2927 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
2928 tso->why_blocked, tso->id, tso);
2933 printThreadStatus(StgTSO *tso)
2935 switch (tso->what_next) {
2937 fprintf(stderr,"has been killed");
2939 case ThreadComplete:
2940 fprintf(stderr,"has completed");
2943 printThreadBlockage(tso);
2948 printAllThreads(void)
2952 sched_belch("all threads:");
2953 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2954 fprintf(stderr, "\tthread %d is ", t->id);
2955 printThreadStatus(t);
2956 fprintf(stderr,"\n");
2961 Print a whole blocking queue attached to node (debugging only).
2966 print_bq (StgClosure *node)
2968 StgBlockingQueueElement *bqe;
2972 fprintf(stderr,"## BQ of closure %p (%s): ",
2973 node, info_type(node));
2975 /* should cover all closures that may have a blocking queue */
2976 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2977 get_itbl(node)->type == FETCH_ME_BQ ||
2978 get_itbl(node)->type == RBH);
2980 ASSERT(node!=(StgClosure*)NULL); // sanity check
2982 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2984 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2985 !end; // iterate until bqe points to a CONSTR
2986 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2987 ASSERT(bqe != END_BQ_QUEUE); // sanity check
2988 ASSERT(bqe != (StgTSO*)NULL); // sanity check
2989 /* types of closures that may appear in a blocking queue */
2990 ASSERT(get_itbl(bqe)->type == TSO ||
2991 get_itbl(bqe)->type == BLOCKED_FETCH ||
2992 get_itbl(bqe)->type == CONSTR);
2993 /* only BQs of an RBH end with an RBH_Save closure */
2994 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2996 switch (get_itbl(bqe)->type) {
2998 fprintf(stderr," TSO %d (%x),",
2999 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3002 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3003 ((StgBlockedFetch *)bqe)->node,
3004 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3005 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3006 ((StgBlockedFetch *)bqe)->ga.weight);
3009 fprintf(stderr," %s (IP %p),",
3010 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3011 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3012 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3013 "RBH_Save_?"), get_itbl(bqe));
3016 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3017 info_type(bqe), node, info_type(node));
3021 fputc('\n', stderr);
3023 # elif defined(GRAN)
3025 print_bq (StgClosure *node)
3027 StgBlockingQueueElement *bqe;
3028 PEs node_loc, tso_loc;
3031 /* should cover all closures that may have a blocking queue */
3032 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3033 get_itbl(node)->type == FETCH_ME_BQ ||
3034 get_itbl(node)->type == RBH);
3036 ASSERT(node!=(StgClosure*)NULL); // sanity check
3037 node_loc = where_is(node);
3039 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3040 node, info_type(node), node_loc);
3043 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3045 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3046 !end; // iterate until bqe points to a CONSTR
3047 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3048 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3049 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3050 /* types of closures that may appear in a blocking queue */
3051 ASSERT(get_itbl(bqe)->type == TSO ||
3052 get_itbl(bqe)->type == CONSTR);
3053 /* only BQs of an RBH end with an RBH_Save closure */
3054 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3056 tso_loc = where_is((StgClosure *)bqe);
3057 switch (get_itbl(bqe)->type) {
3059 fprintf(stderr," TSO %d (%p) on [PE %d],",
3060 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3063 fprintf(stderr," %s (IP %p),",
3064 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3065 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3066 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3067 "RBH_Save_?"), get_itbl(bqe));
3070 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3071 info_type((StgClosure *)bqe), node, info_type(node));
3075 fputc('\n', stderr);
3079 Nice and easy: only TSOs on the blocking queue
3082 print_bq (StgClosure *node)
3086 ASSERT(node!=(StgClosure*)NULL); // sanity check
3087 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3088 tso != END_TSO_QUEUE;
3090 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3091 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3092 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3094 fputc('\n', stderr);
3105 for (i=0, tso=run_queue_hd;
3106 tso != END_TSO_QUEUE;
3115 sched_belch(char *s, ...)
3120 fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3122 fprintf(stderr, "scheduler: ");
3124 vfprintf(stderr, s, ap);
3125 fprintf(stderr, "\n");
3131 //@node Index, , Debugging Routines, Main scheduling code
3135 //* MainRegTable:: @cindex\s-+MainRegTable
3136 //* StgMainThread:: @cindex\s-+StgMainThread
3137 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3138 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3139 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3140 //* context_switch:: @cindex\s-+context_switch
3141 //* createThread:: @cindex\s-+createThread
3142 //* free_capabilities:: @cindex\s-+free_capabilities
3143 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3144 //* initScheduler:: @cindex\s-+initScheduler
3145 //* interrupted:: @cindex\s-+interrupted
3146 //* n_free_capabilities:: @cindex\s-+n_free_capabilities
3147 //* next_thread_id:: @cindex\s-+next_thread_id
3148 //* print_bq:: @cindex\s-+print_bq
3149 //* run_queue_hd:: @cindex\s-+run_queue_hd
3150 //* run_queue_tl:: @cindex\s-+run_queue_tl
3151 //* sched_mutex:: @cindex\s-+sched_mutex
3152 //* schedule:: @cindex\s-+schedule
3153 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3154 //* task_ids:: @cindex\s-+task_ids
3155 //* term_mutex:: @cindex\s-+term_mutex
3156 //* thread_ready_cond:: @cindex\s-+thread_ready_cond