1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.63 2000/04/04 15:02:02 simonmar Exp $
4 * (c) The GHC Team, 1998-2000
8 * The main scheduling code in GranSim is quite different from that in std
9 * (concurrent) Haskell: while concurrent Haskell just iterates over the
10 * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11 * over the events in the global event queue. -- HWL
12 * --------------------------------------------------------------------------*/
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
17 /* Version with scheduler monitor support for SMPs.
19 This design provides a high-level API to create and schedule threads etc.
20 as documented in the SMP design document.
22 It uses a monitor design controlled by a single mutex to exercise control
23 over accesses to shared data structures, and builds on the Posix threads
26 The majority of state is shared. In order to keep essential per-task state,
27 there is a Capability structure, which contains all the information
28 needed to run a thread: its STG registers, a pointer to its TSO, a
29 nursery etc. During STG execution, a pointer to the capability is
30 kept in a register (BaseReg).
32 In a non-SMP build, there is one global capability, namely MainRegTable.
39 //* Variables and Data structures::
40 //* Main scheduling loop::
41 //* Suspend and Resume::
43 //* Garbage Collextion Routines::
44 //* Blocking Queue Routines::
45 //* Exception Handling Routines::
46 //* Debugging Routines::
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
59 #include "StgStartup.h"
63 #include "StgMiscClosures.h"
65 #include "Evaluator.h"
66 #include "Exception.h"
74 #if defined(GRAN) || defined(PAR)
75 # include "GranSimRts.h"
77 # include "ParallelRts.h"
78 # include "Parallel.h"
79 # include "ParallelDebug.h"
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
92 * These are the threads which clients have requested that we run.
94 * In an SMP build, we might have several concurrent clients all
95 * waiting for results, and each one will wait on a condition variable
96 * until the result is available.
98 * In non-SMP, clients are strictly nested: the first client calls
99 * into the RTS, which might call out again to C with a _ccall_GC, and
100 * eventually re-enter the RTS.
102 * Main threads information is kept in a linked list:
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
107 SchedulerStatus stat;
110 pthread_cond_t wakeup;
112 struct StgMainThread_ *link;
115 /* Main thread queue.
116 * Locks required: sched_mutex.
118 static StgMainThread *main_threads;
121 * Locks required: sched_mutex.
125 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
126 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
129 In GranSim we have a runable and a blocked queue for each processor.
130 In order to minimise code changes new arrays run_queue_hds/tls
131 are created. run_queue_hd is then a short cut (macro) for
132 run_queue_hds[CurrentProc] (see GranSim.h).
135 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
136 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
137 StgTSO *ccalling_threadss[MAX_PROC];
138 /* We use the same global list of threads (all_threads) in GranSim as in
139 the std RTS (i.e. we are cheating). However, we don't use this list in
140 the GranSim specific code at the moment (so we are only potentially
145 StgTSO *run_queue_hd, *run_queue_tl;
146 StgTSO *blocked_queue_hd, *blocked_queue_tl;
150 /* Linked list of all threads.
151 * Used for detecting garbage collected threads.
155 /* Threads suspended in _ccall_GC.
157 static StgTSO *suspended_ccalling_threads;
159 static void GetRoots(void);
160 static StgTSO *threadStackOverflow(StgTSO *tso);
162 /* KH: The following two flags are shared memory locations. There is no need
163 to lock them, since they are only unset at the end of a scheduler
167 /* flag set by signal handler to precipitate a context switch */
168 //@cindex context_switch
171 /* if this flag is set as well, give up execution */
172 //@cindex interrupted
175 /* Next thread ID to allocate.
176 * Locks required: sched_mutex
178 //@cindex next_thread_id
179 StgThreadID next_thread_id = 1;
182 * Pointers to the state of the current thread.
183 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
184 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
187 /* The smallest stack size that makes any sense is:
188 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
189 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
190 * + 1 (the realworld token for an IO thread)
191 * + 1 (the closure to enter)
193 * A thread with this stack will bomb immediately with a stack
194 * overflow, which will increase its stack size.
197 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
199 /* Free capability list.
200 * Locks required: sched_mutex.
203 //@cindex free_capabilities
204 //@cindex n_free_capabilities
205 Capability *free_capabilities; /* Available capabilities for running threads */
206 nat n_free_capabilities; /* total number of available capabilities */
208 //@cindex MainRegTable
209 Capability MainRegTable; /* for non-SMP, we have one global capability */
218 /* All our current task ids, saved in case we need to kill them later.
225 void addToBlockedQueue ( StgTSO *tso );
227 static void schedule ( void );
228 void interruptStgRts ( void );
230 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
232 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
236 static void sched_belch(char *s, ...);
240 //@cindex sched_mutex
242 //@cindex thread_ready_cond
243 //@cindex gc_pending_cond
244 pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
245 pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
246 pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
247 pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
254 rtsTime TimeOfLastYield;
258 char *whatNext_strs[] = {
266 char *threadReturnCode_strs[] = {
267 "HeapOverflow", /* might also be StackOverflow */
276 * The thread state for the main thread.
277 // ToDo: check whether not needed any more
281 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
282 //@subsection Main scheduling loop
284 /* ---------------------------------------------------------------------------
285 Main scheduling loop.
287 We use round-robin scheduling, each thread returning to the
288 scheduler loop when one of these conditions is detected:
291 * timer expires (thread yields)
296 Locking notes: we acquire the scheduler lock once at the beginning
297 of the scheduler loop, and release it when
299 * running a thread, or
300 * waiting for work, or
301 * waiting for a GC to complete.
304 In a GranSim setup this loop iterates over the global event queue.
305 This revolves around the global event queue, which determines what
306 to do next. Therefore, it's more complicated than either the
307 concurrent or the parallel (GUM) setup.
310 GUM iterates over incoming messages.
311 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312 and sends out a fish whenever it has nothing to do; in-between
313 doing the actual reductions (shared code below) it processes the
314 incoming messages and deals with delayed operations
315 (see PendingFetches).
316 This is not the ugliest code you could imagine, but it's bloody close.
318 ------------------------------------------------------------------------ */
325 StgThreadReturnCode ret;
334 rtsBool was_interrupted = rtsFalse;
336 ACQUIRE_LOCK(&sched_mutex);
340 /* set up first event to get things going */
341 /* ToDo: assign costs for system setup and init MainTSO ! */
342 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
344 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
347 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
348 G_TSO(CurrentTSO, 5));
350 if (RtsFlags.GranFlags.Light) {
351 /* Save current time; GranSim Light only */
352 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
355 event = get_next_event();
357 while (event!=(rtsEvent*)NULL) {
358 /* Choose the processor with the next event */
359 CurrentProc = event->proc;
360 CurrentTSO = event->tso;
364 while (!GlobalStopPending) { /* GlobalStopPending set in par_exit */
372 IF_DEBUG(scheduler, printAllThreads());
374 /* If we're interrupted (the user pressed ^C, or some other
375 * termination condition occurred), kill all the currently running
379 IF_DEBUG(scheduler, sched_belch("interrupted"));
380 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
383 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
386 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
387 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
388 interrupted = rtsFalse;
389 was_interrupted = rtsTrue;
392 /* Go through the list of main threads and wake up any
393 * clients whose computations have finished. ToDo: this
394 * should be done more efficiently without a linear scan
395 * of the main threads list, somehow...
399 StgMainThread *m, **prev;
400 prev = &main_threads;
401 for (m = main_threads; m != NULL; m = m->link) {
402 switch (m->tso->what_next) {
405 *(m->ret) = (StgClosure *)m->tso->sp[0];
409 pthread_cond_broadcast(&m->wakeup);
413 if (was_interrupted) {
414 m->stat = Interrupted;
418 pthread_cond_broadcast(&m->wakeup);
428 /* in GUM do this only on the Main PE */
431 /* If our main thread has finished or been killed, return.
434 StgMainThread *m = main_threads;
435 if (m->tso->what_next == ThreadComplete
436 || m->tso->what_next == ThreadKilled) {
437 main_threads = main_threads->link;
438 if (m->tso->what_next == ThreadComplete) {
439 /* we finished successfully, fill in the return value */
440 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
444 if (was_interrupted) {
445 m->stat = Interrupted;
455 /* Top up the run queue from our spark pool. We try to make the
456 * number of threads in the run queue equal to the number of
461 nat n = n_free_capabilities;
462 StgTSO *tso = run_queue_hd;
464 /* Count the run queue */
465 while (n > 0 && tso != END_TSO_QUEUE) {
474 break; /* no more sparks in the pool */
476 /* I'd prefer this to be done in activateSpark -- HWL */
477 /* tricky - it needs to hold the scheduler lock and
478 * not try to re-acquire it -- SDM */
480 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
481 pushClosure(tso,spark);
482 PUSH_ON_RUN_QUEUE(tso);
484 advisory_thread_count++;
488 sched_belch("turning spark of closure %p into a thread",
489 (StgClosure *)spark));
492 /* We need to wake up the other tasks if we just created some
495 if (n_free_capabilities - n > 1) {
496 pthread_cond_signal(&thread_ready_cond);
501 /* Check whether any waiting threads need to be woken up. If the
502 * run queue is empty, and there are no other tasks running, we
503 * can wait indefinitely for something to happen.
504 * ToDo: what if another client comes along & requests another
507 if (blocked_queue_hd != END_TSO_QUEUE) {
509 (run_queue_hd == END_TSO_QUEUE)
511 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
516 /* check for signals each time around the scheduler */
518 if (signals_pending()) {
519 start_signal_handlers();
523 /* Detect deadlock: when we have no threads to run, there are
524 * no threads waiting on I/O or sleeping, and all the other
525 * tasks are waiting for work, we must have a deadlock. Inform
526 * all the main threads.
529 if (blocked_queue_hd == END_TSO_QUEUE
530 && run_queue_hd == END_TSO_QUEUE
531 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
534 for (m = main_threads; m != NULL; m = m->link) {
537 pthread_cond_broadcast(&m->wakeup);
543 In GUM all non-main PEs come in here with no work;
544 we ignore multiple main threads for now
546 if (blocked_queue_hd == END_TSO_QUEUE
547 && run_queue_hd == END_TSO_QUEUE) {
548 StgMainThread *m = main_threads;
551 main_threads = m->link;
558 /* If there's a GC pending, don't do anything until it has
562 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
563 pthread_cond_wait(&gc_pending_cond, &sched_mutex);
566 /* block until we've got a thread on the run queue and a free
569 while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
570 IF_DEBUG(scheduler, sched_belch("waiting for work"));
571 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
572 IF_DEBUG(scheduler, sched_belch("work now available"));
578 if (RtsFlags.GranFlags.Light)
579 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
581 /* adjust time based on time-stamp */
582 if (event->time > CurrentTime[CurrentProc] &&
583 event->evttype != ContinueThread)
584 CurrentTime[CurrentProc] = event->time;
586 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
587 if (!RtsFlags.GranFlags.Light)
590 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
592 /* main event dispatcher in GranSim */
593 switch (event->evttype) {
594 /* Should just be continuing execution */
596 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
597 /* ToDo: check assertion
598 ASSERT(run_queue_hd != (StgTSO*)NULL &&
599 run_queue_hd != END_TSO_QUEUE);
601 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
602 if (!RtsFlags.GranFlags.DoAsyncFetch &&
603 procStatus[CurrentProc]==Fetching) {
604 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
605 CurrentTSO->id, CurrentTSO, CurrentProc);
608 /* Ignore ContinueThreads for completed threads */
609 if (CurrentTSO->what_next == ThreadComplete) {
610 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
611 CurrentTSO->id, CurrentTSO, CurrentProc);
614 /* Ignore ContinueThreads for threads that are being migrated */
615 if (PROCS(CurrentTSO)==Nowhere) {
616 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
617 CurrentTSO->id, CurrentTSO, CurrentProc);
620 /* The thread should be at the beginning of the run queue */
621 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
622 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
623 CurrentTSO->id, CurrentTSO, CurrentProc);
624 break; // run the thread anyway
627 new_event(proc, proc, CurrentTime[proc],
629 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
631 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
632 break; // now actually run the thread; DaH Qu'vam yImuHbej
635 do_the_fetchnode(event);
636 goto next_thread; /* handle next event in event queue */
639 do_the_globalblock(event);
640 goto next_thread; /* handle next event in event queue */
643 do_the_fetchreply(event);
644 goto next_thread; /* handle next event in event queue */
646 case UnblockThread: /* Move from the blocked queue to the tail of */
647 do_the_unblock(event);
648 goto next_thread; /* handle next event in event queue */
650 case ResumeThread: /* Move from the blocked queue to the tail of */
651 /* the runnable queue ( i.e. Qu' SImqa'lu') */
652 event->tso->gran.blocktime +=
653 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
654 do_the_startthread(event);
655 goto next_thread; /* handle next event in event queue */
658 do_the_startthread(event);
659 goto next_thread; /* handle next event in event queue */
662 do_the_movethread(event);
663 goto next_thread; /* handle next event in event queue */
666 do_the_movespark(event);
667 goto next_thread; /* handle next event in event queue */
670 do_the_findwork(event);
671 goto next_thread; /* handle next event in event queue */
674 barf("Illegal event type %u\n", event->evttype);
677 /* This point was scheduler_loop in the old RTS */
679 IF_DEBUG(gran, belch("GRAN: after main switch"));
681 TimeOfLastEvent = CurrentTime[CurrentProc];
682 TimeOfNextEvent = get_time_of_next_event();
683 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
684 // CurrentTSO = ThreadQueueHd;
686 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
689 if (RtsFlags.GranFlags.Light)
690 GranSimLight_leave_system(event, &ActiveTSO);
692 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
695 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
697 /* in a GranSim setup the TSO stays on the run queue */
699 /* Take a thread from the run queue. */
700 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
703 fprintf(stderr, "GRAN: About to run current thread, which is\n");
706 context_switch = 0; // turned on via GranYield, checking events and time slice
709 DumpGranEvent(GR_SCHEDULE, t));
711 procStatus[CurrentProc] = Busy;
715 if (PendingFetches != END_BF_QUEUE) {
719 /* ToDo: phps merge with spark activation above */
720 /* check whether we have local work and send requests if we have none */
721 if (run_queue_hd == END_TSO_QUEUE) { /* no runnable threads */
722 /* :-[ no local threads => look out for local sparks */
723 /* the spark pool for the current PE */
724 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
725 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
726 pool->hd < pool->tl) {
728 * ToDo: add GC code check that we really have enough heap afterwards!!
730 * If we're here (no runnable threads) and we have pending
731 * sparks, we must have a space problem. Get enough space
732 * to turn one of those pending sparks into a
736 spark = findSpark(); /* get a spark */
737 if (spark != (rtsSpark) NULL) {
738 tso = activateSpark(spark); /* turn the spark into a thread */
739 IF_PAR_DEBUG(schedule,
740 belch("==== schedule: Created TSO %d (%p); %d threads active",
741 tso->id, tso, advisory_thread_count));
743 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
744 belch("==^^ failed to activate spark");
746 } /* otherwise fall through & pick-up new tso */
748 IF_PAR_DEBUG(verbose,
749 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
750 spark_queue_len(pool)));
754 /* =8-[ no local sparks => look for work on other PEs */
757 * We really have absolutely no work. Send out a fish
758 * (there may be some out there already), and wait for
759 * something to arrive. We clearly can't run any threads
760 * until a SCHEDULE or RESUME arrives, and so that's what
761 * we're hoping to see. (Of course, we still have to
762 * respond to other types of messages.)
765 outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
766 // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
767 /* fishing set in sendFish, processFish;
768 avoid flooding system with fishes via delay */
770 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
778 } else if (PacketsWaiting()) { /* Look for incoming messages */
782 /* Now we are sure that we have some work available */
783 ASSERT(run_queue_hd != END_TSO_QUEUE);
784 /* Take a thread from the run queue, if we have work */
785 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
787 /* ToDo: write something to the log-file
788 if (RTSflags.ParFlags.granSimStats && !sameThread)
789 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
793 /* the spark pool for the current PE */
794 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
796 IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)",
797 spark_queue_len(pool),
799 pool->hd, pool->tl, pool->base, pool->lim));
801 IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)",
802 run_queue_len(), CURRENT_PROC,
803 run_queue_hd, run_queue_tl));
808 we are running a different TSO, so write a schedule event to log file
809 NB: If we use fair scheduling we also have to write a deschedule
810 event for LastTSO; with unfair scheduling we know that the
811 previous tso has blocked whenever we switch to another tso, so
812 we don't need it in GUM for now
814 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
815 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
819 #else /* !GRAN && !PAR */
821 /* grab a thread from the run queue
824 IF_DEBUG(sanity,checkTSO(t));
831 cap = free_capabilities;
832 free_capabilities = cap->link;
833 n_free_capabilities--;
838 cap->rCurrentTSO = t;
840 /* set the context_switch flag
842 if (run_queue_hd == END_TSO_QUEUE)
847 RELEASE_LOCK(&sched_mutex);
849 #if defined(GRAN) || defined(PAR)
850 IF_DEBUG(scheduler, belch("-->> Running TSO %ld (%p) %s ...",
851 t->id, t, whatNext_strs[t->what_next]));
853 IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
856 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
857 /* Run the current thread
859 switch (cap->rCurrentTSO->what_next) {
862 /* Thread already finished, return to scheduler. */
863 ret = ThreadFinished;
866 ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
869 ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
871 case ThreadEnterHugs:
875 IF_DEBUG(scheduler,sched_belch("entering Hugs"));
876 c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
877 cap->rCurrentTSO->sp += 1;
882 barf("Panic: entered a BCO but no bytecode interpreter in this build");
885 barf("schedule: invalid what_next field");
887 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
889 /* Costs for the scheduler are assigned to CCS_SYSTEM */
894 ACQUIRE_LOCK(&sched_mutex);
897 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
898 #elif !defined(GRAN) && !defined(PAR)
899 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
901 t = cap->rCurrentTSO;
904 /* HACK 675: if the last thread didn't yield, make sure to print a
905 SCHEDULE event to the log file when StgRunning the next thread, even
906 if it is the same one as before */
907 LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t;
908 TimeOfLastYield = CURRENT_TIME;
913 /* make all the running tasks block on a condition variable,
914 * maybe set context_switch and wait till they all pile in,
915 * then have them wait on a GC condition variable.
917 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
918 t->id, t, whatNext_strs[t->what_next]));
921 ASSERT(!is_on_queue(t,CurrentProc));
924 ready_to_gc = rtsTrue;
925 context_switch = 1; /* stop other threads ASAP */
926 PUSH_ON_RUN_QUEUE(t);
927 /* actual GC is done at the end of the while loop */
931 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
932 t->id, t, whatNext_strs[t->what_next]));
933 /* just adjust the stack for this thread, then pop it back
939 /* enlarge the stack */
940 StgTSO *new_t = threadStackOverflow(t);
942 /* This TSO has moved, so update any pointers to it from the
943 * main thread stack. It better not be on any other queues...
946 for (m = main_threads; m != NULL; m = m->link) {
952 PUSH_ON_RUN_QUEUE(new_t);
959 DumpGranEvent(GR_DESCHEDULE, t));
960 globalGranStats.tot_yields++;
963 DumpGranEvent(GR_DESCHEDULE, t));
965 /* put the thread back on the run queue. Then, if we're ready to
966 * GC, check whether this is the last task to stop. If so, wake
967 * up the GC thread. getThread will block during a GC until the
971 if (t->what_next == ThreadEnterHugs) {
972 /* ToDo: or maybe a timer expired when we were in Hugs?
973 * or maybe someone hit ctrl-C
975 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
976 t->id, t, whatNext_strs[t->what_next]);
978 belch("--<< thread %ld (%p; %s) stopped, yielding",
979 t->id, t, whatNext_strs[t->what_next]);
984 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
986 ASSERT(t->link == END_TSO_QUEUE);
988 ASSERT(!is_on_queue(t,CurrentProc));
991 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
992 checkThreadQsSanity(rtsTrue));
994 APPEND_TO_RUN_QUEUE(t);
996 /* add a ContinueThread event to actually process the thread */
997 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
999 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1001 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1010 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1011 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1012 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1014 // ??? needed; should emit block before
1016 DumpGranEvent(GR_DESCHEDULE, t));
1017 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1020 ASSERT(procStatus[CurrentProc]==Busy ||
1021 ((procStatus[CurrentProc]==Fetching) &&
1022 (t->block_info.closure!=(StgClosure*)NULL)));
1023 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1024 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1025 procStatus[CurrentProc]==Fetching))
1026 procStatus[CurrentProc] = Idle;
1030 DumpGranEvent(GR_DESCHEDULE, t));
1032 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1036 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1037 t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1038 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1041 /* don't need to do anything. Either the thread is blocked on
1042 * I/O, in which case we'll have called addToBlockedQueue
1043 * previously, or it's blocked on an MVar or Blackhole, in which
1044 * case it'll be on the relevant queue already.
1047 fprintf(stderr, "--<< thread %d (%p) stopped ", t->id, t);
1048 printThreadBlockage(t);
1049 fprintf(stderr, "\n"));
1051 /* Only for dumping event to log file
1052 ToDo: do I need this in GranSim, too?
1059 case ThreadFinished:
1060 /* Need to check whether this was a main thread, and if so, signal
1061 * the task that started it with the return value. If we have no
1062 * more main threads, we probably need to stop all the tasks until
1065 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1066 t->what_next = ThreadComplete;
1068 endThread(t, CurrentProc); // clean-up the thread
1070 advisory_thread_count--;
1071 if (RtsFlags.ParFlags.ParStats.Full)
1072 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1077 barf("doneThread: invalid thread return code");
1081 cap->link = free_capabilities;
1082 free_capabilities = cap;
1083 n_free_capabilities++;
1087 if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
1092 /* everybody back, start the GC.
1093 * Could do it in this thread, or signal a condition var
1094 * to do it in another thread. Either way, we need to
1095 * broadcast on gc_pending_cond afterward.
1098 IF_DEBUG(scheduler,sched_belch("doing GC"));
1100 GarbageCollect(GetRoots);
1101 ready_to_gc = rtsFalse;
1103 pthread_cond_broadcast(&gc_pending_cond);
1106 /* add a ContinueThread event to continue execution of current thread */
1107 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1109 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1111 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1118 IF_GRAN_DEBUG(unused,
1119 print_eventq(EventHd));
1121 event = get_next_event();
1125 /* ToDo: wait for next message to arrive rather than busy wait */
1130 t = take_off_run_queue(END_TSO_QUEUE);
1133 } /* end of while(1) */
1136 /* A hack for Hugs concurrency support. Needs sanitisation (?) */
1137 void deleteAllThreads ( void )
1140 IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
1141 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1144 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1147 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1148 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1151 /* startThread and insertThread are now in GranSim.c -- HWL */
1153 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1154 //@subsection Suspend and Resume
1156 /* ---------------------------------------------------------------------------
1157 * Suspending & resuming Haskell threads.
1159 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1160 * its capability before calling the C function. This allows another
1161 * task to pick up the capability and carry on running Haskell
1162 * threads. It also means that if the C call blocks, it won't lock
1165 * The Haskell thread making the C call is put to sleep for the
1166 * duration of the call, on the susepended_ccalling_threads queue. We
1167 * give out a token to the task, which it can use to resume the thread
1168 * on return from the C function.
1169 * ------------------------------------------------------------------------- */
1172 suspendThread( Capability *cap )
1176 ACQUIRE_LOCK(&sched_mutex);
1179 sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
1181 threadPaused(cap->rCurrentTSO);
1182 cap->rCurrentTSO->link = suspended_ccalling_threads;
1183 suspended_ccalling_threads = cap->rCurrentTSO;
1185 /* Use the thread ID as the token; it should be unique */
1186 tok = cap->rCurrentTSO->id;
1189 cap->link = free_capabilities;
1190 free_capabilities = cap;
1191 n_free_capabilities++;
1194 RELEASE_LOCK(&sched_mutex);
1199 resumeThread( StgInt tok )
1201 StgTSO *tso, **prev;
1204 ACQUIRE_LOCK(&sched_mutex);
1206 prev = &suspended_ccalling_threads;
1207 for (tso = suspended_ccalling_threads;
1208 tso != END_TSO_QUEUE;
1209 prev = &tso->link, tso = tso->link) {
1210 if (tso->id == (StgThreadID)tok) {
1215 if (tso == END_TSO_QUEUE) {
1216 barf("resumeThread: thread not found");
1220 while (free_capabilities == NULL) {
1221 IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1222 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1223 IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1225 cap = free_capabilities;
1226 free_capabilities = cap->link;
1227 n_free_capabilities--;
1229 cap = &MainRegTable;
1232 cap->rCurrentTSO = tso;
1234 RELEASE_LOCK(&sched_mutex);
1239 /* ---------------------------------------------------------------------------
1241 * ------------------------------------------------------------------------ */
1242 static void unblockThread(StgTSO *tso);
1244 /* ---------------------------------------------------------------------------
1245 * Comparing Thread ids.
1247 * This is used from STG land in the implementation of the
1248 * instances of Eq/Ord for ThreadIds.
1249 * ------------------------------------------------------------------------ */
1251 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1253 StgThreadID id1 = tso1->id;
1254 StgThreadID id2 = tso2->id;
1256 if (id1 < id2) return (-1);
1257 if (id1 > id2) return 1;
1261 /* ---------------------------------------------------------------------------
1262 Create a new thread.
1264 The new thread starts with the given stack size. Before the
1265 scheduler can run, however, this thread needs to have a closure
1266 (and possibly some arguments) pushed on its stack. See
1267 pushClosure() in Schedule.h.
1269 createGenThread() and createIOThread() (in SchedAPI.h) are
1270 convenient packaged versions of this function.
1272 currently pri (priority) is only used in a GRAN setup -- HWL
1273 ------------------------------------------------------------------------ */
1274 //@cindex createThread
1276 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1278 createThread(nat stack_size, StgInt pri)
1280 return createThread_(stack_size, rtsFalse, pri);
1284 createThread_(nat size, rtsBool have_lock, StgInt pri)
1288 createThread(nat stack_size)
1290 return createThread_(stack_size, rtsFalse);
1294 createThread_(nat size, rtsBool have_lock)
1301 /* First check whether we should create a thread at all */
1303 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1304 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1306 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1307 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1308 return END_TSO_QUEUE;
1314 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1317 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1319 /* catch ridiculously small stack sizes */
1320 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1321 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1324 stack_size = size - TSO_STRUCT_SIZEW;
1326 tso = (StgTSO *)allocate(size);
1327 TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1329 SET_HDR(tso, &TSO_info, CCS_MAIN);
1331 SET_GRAN_HDR(tso, ThisPE);
1333 tso->what_next = ThreadEnterGHC;
1335 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1336 * protect the increment operation on next_thread_id.
1337 * In future, we could use an atomic increment instead.
1339 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1340 tso->id = next_thread_id++;
1341 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1343 tso->why_blocked = NotBlocked;
1344 tso->blocked_exceptions = NULL;
1346 tso->splim = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1347 tso->stack_size = stack_size;
1348 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1350 tso->sp = (P_)&(tso->stack) + stack_size;
1353 tso->prof.CCCS = CCS_MAIN;
1356 /* put a stop frame on the stack */
1357 tso->sp -= sizeofW(StgStopFrame);
1358 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
1359 tso->su = (StgUpdateFrame*)tso->sp;
1361 IF_DEBUG(scheduler,belch("---- Initialised TSO %ld (%p), stack size = %lx words",
1362 tso->id, tso, tso->stack_size));
1366 tso->link = END_TSO_QUEUE;
1367 /* uses more flexible routine in GranSim */
1368 insertThread(tso, CurrentProc);
1370 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1375 #if defined(GRAN) || defined(PAR)
1376 DumpGranEvent(GR_START,tso);
1379 /* Link the new thread on the global thread list.
1381 tso->global_link = all_threads;
1385 tso->gran.pri = pri;
1387 tso->gran.magic = TSO_MAGIC; // debugging only
1389 tso->gran.sparkname = 0;
1390 tso->gran.startedat = CURRENT_TIME;
1391 tso->gran.exported = 0;
1392 tso->gran.basicblocks = 0;
1393 tso->gran.allocs = 0;
1394 tso->gran.exectime = 0;
1395 tso->gran.fetchtime = 0;
1396 tso->gran.fetchcount = 0;
1397 tso->gran.blocktime = 0;
1398 tso->gran.blockcount = 0;
1399 tso->gran.blockedat = 0;
1400 tso->gran.globalsparks = 0;
1401 tso->gran.localsparks = 0;
1402 if (RtsFlags.GranFlags.Light)
1403 tso->gran.clock = Now; /* local clock */
1405 tso->gran.clock = 0;
1407 IF_DEBUG(gran,printTSO(tso));
1410 tso->par.magic = TSO_MAGIC; // debugging only
1412 tso->par.sparkname = 0;
1413 tso->par.startedat = CURRENT_TIME;
1414 tso->par.exported = 0;
1415 tso->par.basicblocks = 0;
1416 tso->par.allocs = 0;
1417 tso->par.exectime = 0;
1418 tso->par.fetchtime = 0;
1419 tso->par.fetchcount = 0;
1420 tso->par.blocktime = 0;
1421 tso->par.blockcount = 0;
1422 tso->par.blockedat = 0;
1423 tso->par.globalsparks = 0;
1424 tso->par.localsparks = 0;
1428 globalGranStats.tot_threads_created++;
1429 globalGranStats.threads_created_on_PE[CurrentProc]++;
1430 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1431 globalGranStats.tot_sq_probes++;
1436 belch("==__ schedule: Created TSO %d (%p);",
1437 CurrentProc, tso, tso->id));
1439 IF_PAR_DEBUG(verbose,
1440 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1441 tso->id, tso, advisory_thread_count));
1443 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1444 tso->id, tso->stack_size));
1450 Turn a spark into a thread.
1451 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1454 //@cindex activateSpark
1456 activateSpark (rtsSpark spark)
1460 ASSERT(spark != (rtsSpark)NULL);
1461 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1462 if (tso!=END_TSO_QUEUE) {
1463 pushClosure(tso,spark);
1464 PUSH_ON_RUN_QUEUE(tso);
1465 advisory_thread_count++;
1467 if (RtsFlags.ParFlags.ParStats.Full) {
1468 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1469 IF_PAR_DEBUG(verbose,
1470 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1471 (StgClosure *)spark, info_type((StgClosure *)spark)));
1474 barf("activateSpark: Cannot create TSO");
1476 // ToDo: fwd info on local/global spark to thread -- HWL
1477 // tso->gran.exported = spark->exported;
1478 // tso->gran.locked = !spark->global;
1479 // tso->gran.sparkname = spark->name;
1485 /* ---------------------------------------------------------------------------
1488 * scheduleThread puts a thread on the head of the runnable queue.
1489 * This will usually be done immediately after a thread is created.
1490 * The caller of scheduleThread must create the thread using e.g.
1491 * createThread and push an appropriate closure
1492 * on this thread's stack before the scheduler is invoked.
1493 * ------------------------------------------------------------------------ */
1496 scheduleThread(StgTSO *tso)
1498 if (tso==END_TSO_QUEUE){
1503 ACQUIRE_LOCK(&sched_mutex);
1505 /* Put the new thread on the head of the runnable queue. The caller
1506 * better push an appropriate closure on this thread's stack
1507 * beforehand. In the SMP case, the thread may start running as
1508 * soon as we release the scheduler lock below.
1510 PUSH_ON_RUN_QUEUE(tso);
1513 IF_DEBUG(scheduler,printTSO(tso));
1514 RELEASE_LOCK(&sched_mutex);
1517 /* ---------------------------------------------------------------------------
1520 * Start up Posix threads to run each of the scheduler tasks.
1521 * I believe the task ids are not needed in the system as defined.
1523 * ------------------------------------------------------------------------ */
1525 #if defined(PAR) || defined(SMP)
1527 taskStart( void *arg STG_UNUSED )
1529 rts_evalNothing(NULL);
1533 /* ---------------------------------------------------------------------------
1536 * Initialise the scheduler. This resets all the queues - if the
1537 * queues contained any threads, they'll be garbage collected at the
1540 * This now calls startTasks(), so should only be called once! KH @ 25/10/99
1541 * ------------------------------------------------------------------------ */
1545 term_handler(int sig STG_UNUSED)
1548 ACQUIRE_LOCK(&term_mutex);
1550 RELEASE_LOCK(&term_mutex);
1555 //@cindex initScheduler
1562 for (i=0; i<=MAX_PROC; i++) {
1563 run_queue_hds[i] = END_TSO_QUEUE;
1564 run_queue_tls[i] = END_TSO_QUEUE;
1565 blocked_queue_hds[i] = END_TSO_QUEUE;
1566 blocked_queue_tls[i] = END_TSO_QUEUE;
1567 ccalling_threadss[i] = END_TSO_QUEUE;
1570 run_queue_hd = END_TSO_QUEUE;
1571 run_queue_tl = END_TSO_QUEUE;
1572 blocked_queue_hd = END_TSO_QUEUE;
1573 blocked_queue_tl = END_TSO_QUEUE;
1576 suspended_ccalling_threads = END_TSO_QUEUE;
1578 main_threads = NULL;
1579 all_threads = END_TSO_QUEUE;
1584 enteredCAFs = END_CAF_LIST;
1586 /* Install the SIGHUP handler */
1589 struct sigaction action,oact;
1591 action.sa_handler = term_handler;
1592 sigemptyset(&action.sa_mask);
1593 action.sa_flags = 0;
1594 if (sigaction(SIGTERM, &action, &oact) != 0) {
1595 barf("can't install TERM handler");
1601 /* Allocate N Capabilities */
1604 Capability *cap, *prev;
1607 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1608 cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1612 free_capabilities = cap;
1613 n_free_capabilities = RtsFlags.ParFlags.nNodes;
1615 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1616 n_free_capabilities););
1619 #if defined(SMP) || defined(PAR)
1632 /* make some space for saving all the thread ids */
1633 task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1634 "initScheduler:task_ids");
1636 /* and create all the threads */
1637 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1638 r = pthread_create(&tid,NULL,taskStart,NULL);
1640 barf("startTasks: Can't create new Posix thread");
1642 task_ids[i].id = tid;
1643 task_ids[i].mut_time = 0.0;
1644 task_ids[i].mut_etime = 0.0;
1645 task_ids[i].gc_time = 0.0;
1646 task_ids[i].gc_etime = 0.0;
1647 task_ids[i].elapsedtimestart = elapsedtime();
1648 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1654 exitScheduler( void )
1659 /* Don't want to use pthread_cancel, since we'd have to install
1660 * these silly exception handlers (pthread_cleanup_{push,pop}) around
1664 /* Cancel all our tasks */
1665 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1666 pthread_cancel(task_ids[i].id);
1669 /* Wait for all the tasks to terminate */
1670 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1671 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n",
1673 pthread_join(task_ids[i].id, NULL);
1677 /* Send 'em all a SIGHUP. That should shut 'em up.
1679 await_death = RtsFlags.ParFlags.nNodes;
1680 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1681 pthread_kill(task_ids[i].id,SIGTERM);
1683 while (await_death > 0) {
1689 /* -----------------------------------------------------------------------------
1690 Managing the per-task allocation areas.
1692 Each capability comes with an allocation area. These are
1693 fixed-length block lists into which allocation can be done.
1695 ToDo: no support for two-space collection at the moment???
1696 -------------------------------------------------------------------------- */
1698 /* -----------------------------------------------------------------------------
1699 * waitThread is the external interface for running a new computation
1700 * and waiting for the result.
1702 * In the non-SMP case, we create a new main thread, push it on the
1703 * main-thread stack, and invoke the scheduler to run it. The
1704 * scheduler will return when the top main thread on the stack has
1705 * completed or died, and fill in the necessary fields of the
1706 * main_thread structure.
1708 * In the SMP case, we create a main thread as before, but we then
1709 * create a new condition variable and sleep on it. When our new
1710 * main thread has completed, we'll be woken up and the status/result
1711 * will be in the main_thread struct.
1712 * -------------------------------------------------------------------------- */
1715 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1718 SchedulerStatus stat;
1720 ACQUIRE_LOCK(&sched_mutex);
1722 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1728 pthread_cond_init(&m->wakeup, NULL);
1731 m->link = main_threads;
1734 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n",
1739 pthread_cond_wait(&m->wakeup, &sched_mutex);
1740 } while (m->stat == NoStatus);
1742 /* GranSim specific init */
1743 CurrentTSO = m->tso; // the TSO to run
1744 procStatus[MainProc] = Busy; // status of main PE
1745 CurrentProc = MainProc; // PE to run it on
1750 ASSERT(m->stat != NoStatus);
1756 pthread_cond_destroy(&m->wakeup);
1759 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n",
1763 RELEASE_LOCK(&sched_mutex);
1768 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1769 //@subsection Run queue code
1773 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1774 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1775 implicit global variable that has to be correct when calling these
1779 /* Put the new thread on the head of the runnable queue.
1780 * The caller of createThread better push an appropriate closure
1781 * on this thread's stack before the scheduler is invoked.
1783 static /* inline */ void
1784 add_to_run_queue(tso)
1787 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1788 tso->link = run_queue_hd;
1790 if (run_queue_tl == END_TSO_QUEUE) {
1795 /* Put the new thread at the end of the runnable queue. */
1796 static /* inline */ void
1797 push_on_run_queue(tso)
1800 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1801 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1802 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1803 if (run_queue_hd == END_TSO_QUEUE) {
1806 run_queue_tl->link = tso;
1812 Should be inlined because it's used very often in schedule. The tso
1813 argument is actually only needed in GranSim, where we want to have the
1814 possibility to schedule *any* TSO on the run queue, irrespective of the
1815 actual ordering. Therefore, if tso is not the nil TSO then we traverse
1816 the run queue and dequeue the tso, adjusting the links in the queue.
1818 //@cindex take_off_run_queue
1819 static /* inline */ StgTSO*
1820 take_off_run_queue(StgTSO *tso) {
1824 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1826 if tso is specified, unlink that tso from the run_queue (doesn't have
1827 to be at the beginning of the queue); GranSim only
1829 if (tso!=END_TSO_QUEUE) {
1830 /* find tso in queue */
1831 for (t=run_queue_hd, prev=END_TSO_QUEUE;
1832 t!=END_TSO_QUEUE && t!=tso;
1836 /* now actually dequeue the tso */
1837 if (prev!=END_TSO_QUEUE) {
1838 ASSERT(run_queue_hd!=t);
1839 prev->link = t->link;
1841 /* t is at beginning of thread queue */
1842 ASSERT(run_queue_hd==t);
1843 run_queue_hd = t->link;
1845 /* t is at end of thread queue */
1846 if (t->link==END_TSO_QUEUE) {
1847 ASSERT(t==run_queue_tl);
1848 run_queue_tl = prev;
1850 ASSERT(run_queue_tl!=t);
1852 t->link = END_TSO_QUEUE;
1854 /* take tso from the beginning of the queue; std concurrent code */
1856 if (t != END_TSO_QUEUE) {
1857 run_queue_hd = t->link;
1858 t->link = END_TSO_QUEUE;
1859 if (run_queue_hd == END_TSO_QUEUE) {
1860 run_queue_tl = END_TSO_QUEUE;
1869 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1870 //@subsection Garbage Collextion Routines
1872 /* ---------------------------------------------------------------------------
1873 Where are the roots that we know about?
1875 - all the threads on the runnable queue
1876 - all the threads on the blocked queue
1877 - all the thread currently executing a _ccall_GC
1878 - all the "main threads"
1880 ------------------------------------------------------------------------ */
1882 /* This has to be protected either by the scheduler monitor, or by the
1883 garbage collection monitor (probably the latter).
1887 static void GetRoots(void)
1894 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1895 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1896 run_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1897 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1898 run_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1900 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1901 blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1902 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1903 blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1904 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1905 ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1912 if (run_queue_hd != END_TSO_QUEUE) {
1913 ASSERT(run_queue_tl != END_TSO_QUEUE);
1914 run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1915 run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1918 if (blocked_queue_hd != END_TSO_QUEUE) {
1919 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1920 blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1921 blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1925 for (m = main_threads; m != NULL; m = m->link) {
1926 m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1928 if (suspended_ccalling_threads != END_TSO_QUEUE)
1929 suspended_ccalling_threads =
1930 (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1932 #if defined(SMP) || defined(PAR) || defined(GRAN)
1937 /* -----------------------------------------------------------------------------
1940 This is the interface to the garbage collector from Haskell land.
1941 We provide this so that external C code can allocate and garbage
1942 collect when called from Haskell via _ccall_GC.
1944 It might be useful to provide an interface whereby the programmer
1945 can specify more roots (ToDo).
1947 This needs to be protected by the GC condition variable above. KH.
1948 -------------------------------------------------------------------------- */
1950 void (*extra_roots)(void);
1955 GarbageCollect(GetRoots);
1961 GetRoots(); /* the scheduler's roots */
1962 extra_roots(); /* the user's roots */
1966 performGCWithRoots(void (*get_roots)(void))
1968 extra_roots = get_roots;
1970 GarbageCollect(AllRoots);
1973 /* -----------------------------------------------------------------------------
1976 If the thread has reached its maximum stack size, then raise the
1977 StackOverflow exception in the offending thread. Otherwise
1978 relocate the TSO into a larger chunk of memory and adjust its stack
1980 -------------------------------------------------------------------------- */
1983 threadStackOverflow(StgTSO *tso)
1985 nat new_stack_size, new_tso_size, diff, stack_words;
1989 IF_DEBUG(sanity,checkTSO(tso));
1990 if (tso->stack_size >= tso->max_stack_size) {
1993 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
1994 tso->id, tso, tso->stack_size, tso->max_stack_size);
1995 /* If we're debugging, just print out the top of the stack */
1996 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2000 fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
2003 /* Send this thread the StackOverflow exception */
2004 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2009 /* Try to double the current stack size. If that takes us over the
2010 * maximum stack size for this thread, then use the maximum instead.
2011 * Finally round up so the TSO ends up as a whole number of blocks.
2013 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2014 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2015 TSO_STRUCT_SIZE)/sizeof(W_);
2016 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2017 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2019 IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2021 dest = (StgTSO *)allocate(new_tso_size);
2022 TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2024 /* copy the TSO block and the old stack into the new area */
2025 memcpy(dest,tso,TSO_STRUCT_SIZE);
2026 stack_words = tso->stack + tso->stack_size - tso->sp;
2027 new_sp = (P_)dest + new_tso_size - stack_words;
2028 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2030 /* relocate the stack pointers... */
2031 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2032 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2034 dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
2035 dest->stack_size = new_stack_size;
2037 /* and relocate the update frame list */
2038 relocate_TSO(tso, dest);
2040 /* Mark the old TSO as relocated. We have to check for relocated
2041 * TSOs in the garbage collector and any primops that deal with TSOs.
2043 * It's important to set the sp and su values to just beyond the end
2044 * of the stack, so we don't attempt to scavenge any part of the
2047 tso->what_next = ThreadRelocated;
2049 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2050 tso->su = (StgUpdateFrame *)tso->sp;
2051 tso->why_blocked = NotBlocked;
2052 dest->mut_link = NULL;
2054 IF_PAR_DEBUG(verbose,
2055 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2056 tso->id, tso, tso->stack_size);
2057 /* If we're debugging, just print out the top of the stack */
2058 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2061 IF_DEBUG(sanity,checkTSO(tso));
2063 IF_DEBUG(scheduler,printTSO(dest));
2069 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2070 //@subsection Blocking Queue Routines
2072 /* ---------------------------------------------------------------------------
2073 Wake up a queue that was blocked on some resource.
2074 ------------------------------------------------------------------------ */
2076 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
2080 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2085 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2087 /* write RESUME events to log file and
2088 update blocked and fetch time (depending on type of the orig closure) */
2089 if (RtsFlags.ParFlags.ParStats.Full) {
2090 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2091 GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2092 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2094 switch (get_itbl(node)->type) {
2096 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2101 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2104 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2111 static StgBlockingQueueElement *
2112 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2115 PEs node_loc, tso_loc;
2117 node_loc = where_is(node); // should be lifted out of loop
2118 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2119 tso_loc = where_is((StgClosure *)tso);
2120 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2121 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2122 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2123 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2124 // insertThread(tso, node_loc);
2125 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2127 tso, node, (rtsSpark*)NULL);
2128 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2131 } else { // TSO is remote (actually should be FMBQ)
2132 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2133 RtsFlags.GranFlags.Costs.gunblocktime +
2134 RtsFlags.GranFlags.Costs.latency;
2135 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2137 tso, node, (rtsSpark*)NULL);
2138 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2141 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2143 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2144 (node_loc==tso_loc ? "Local" : "Global"),
2145 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2146 tso->block_info.closure = NULL;
2147 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2151 static StgBlockingQueueElement *
2152 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2154 StgBlockingQueueElement *next;
2156 switch (get_itbl(bqe)->type) {
2158 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2159 /* if it's a TSO just push it onto the run_queue */
2161 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2162 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2164 unblockCount(bqe, node);
2165 /* reset blocking status after dumping event */
2166 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2170 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2172 bqe->link = PendingFetches;
2173 PendingFetches = bqe;
2177 /* can ignore this case in a non-debugging setup;
2178 see comments on RBHSave closures above */
2180 /* check that the closure is an RBHSave closure */
2181 ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
2182 get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
2183 get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
2187 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2188 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2192 // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2196 #else /* !GRAN && !PAR */
2198 unblockOneLocked(StgTSO *tso)
2202 ASSERT(get_itbl(tso)->type == TSO);
2203 ASSERT(tso->why_blocked != NotBlocked);
2204 tso->why_blocked = NotBlocked;
2206 PUSH_ON_RUN_QUEUE(tso);
2208 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2213 #if defined(GRAN) || defined(PAR)
2214 inline StgBlockingQueueElement *
2215 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2217 ACQUIRE_LOCK(&sched_mutex);
2218 bqe = unblockOneLocked(bqe, node);
2219 RELEASE_LOCK(&sched_mutex);
2224 unblockOne(StgTSO *tso)
2226 ACQUIRE_LOCK(&sched_mutex);
2227 tso = unblockOneLocked(tso);
2228 RELEASE_LOCK(&sched_mutex);
2235 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2237 StgBlockingQueueElement *bqe;
2242 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2243 node, CurrentProc, CurrentTime[CurrentProc],
2244 CurrentTSO->id, CurrentTSO));
2246 node_loc = where_is(node);
2248 ASSERT(get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2249 get_itbl(q)->type == CONSTR); // closure (type constructor)
2250 ASSERT(is_unique(node));
2252 /* FAKE FETCH: magically copy the node to the tso's proc;
2253 no Fetch necessary because in reality the node should not have been
2254 moved to the other PE in the first place
2256 if (CurrentProc!=node_loc) {
2258 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2259 node, node_loc, CurrentProc, CurrentTSO->id,
2260 // CurrentTSO, where_is(CurrentTSO),
2261 node->header.gran.procs));
2262 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2264 belch("## new bitmask of node %p is %#x",
2265 node, node->header.gran.procs));
2266 if (RtsFlags.GranFlags.GranSimStats.Global) {
2267 globalGranStats.tot_fake_fetches++;
2272 // ToDo: check: ASSERT(CurrentProc==node_loc);
2273 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2276 bqe points to the current element in the queue
2277 next points to the next element in the queue
2279 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2280 //tso_loc = where_is(tso);
2282 bqe = unblockOneLocked(bqe, node);
2285 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2286 the closure to make room for the anchor of the BQ */
2287 if (bqe!=END_BQ_QUEUE) {
2288 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2290 ASSERT((info_ptr==&RBH_Save_0_info) ||
2291 (info_ptr==&RBH_Save_1_info) ||
2292 (info_ptr==&RBH_Save_2_info));
2294 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2295 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2296 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2299 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2300 node, info_type(node)));
2303 /* statistics gathering */
2304 if (RtsFlags.GranFlags.GranSimStats.Global) {
2305 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2306 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2307 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2308 globalGranStats.tot_awbq++; // total no. of bqs awakened
2311 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2312 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2316 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2318 StgBlockingQueueElement *bqe, *next;
2320 ACQUIRE_LOCK(&sched_mutex);
2322 IF_PAR_DEBUG(verbose,
2323 belch("## AwBQ for node %p on [%x]: ",
2326 ASSERT(get_itbl(q)->type == TSO ||
2327 get_itbl(q)->type == BLOCKED_FETCH ||
2328 get_itbl(q)->type == CONSTR);
2331 while (get_itbl(bqe)->type==TSO ||
2332 get_itbl(bqe)->type==BLOCKED_FETCH) {
2333 bqe = unblockOneLocked(bqe, node);
2335 RELEASE_LOCK(&sched_mutex);
2338 #else /* !GRAN && !PAR */
2340 awakenBlockedQueue(StgTSO *tso)
2342 ACQUIRE_LOCK(&sched_mutex);
2343 while (tso != END_TSO_QUEUE) {
2344 tso = unblockOneLocked(tso);
2346 RELEASE_LOCK(&sched_mutex);
2350 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2351 //@subsection Exception Handling Routines
2353 /* ---------------------------------------------------------------------------
2355 - usually called inside a signal handler so it mustn't do anything fancy.
2356 ------------------------------------------------------------------------ */
2359 interruptStgRts(void)
2365 /* -----------------------------------------------------------------------------
2368 This is for use when we raise an exception in another thread, which
2370 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2371 -------------------------------------------------------------------------- */
2373 #if defined(GRAN) || defined(PAR)
2375 NB: only the type of the blocking queue is different in GranSim and GUM
2376 the operations on the queue-elements are the same
2377 long live polymorphism!
2380 unblockThread(StgTSO *tso)
2382 StgBlockingQueueElement *t, **last;
2384 ACQUIRE_LOCK(&sched_mutex);
2385 switch (tso->why_blocked) {
2388 return; /* not blocked */
2391 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2393 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2394 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2396 last = (StgBlockingQueueElement **)&mvar->head;
2397 for (t = (StgBlockingQueueElement *)mvar->head;
2399 last = &t->link, last_tso = t, t = t->link) {
2400 if (t == (StgBlockingQueueElement *)tso) {
2401 *last = (StgBlockingQueueElement *)tso->link;
2402 if (mvar->tail == tso) {
2403 mvar->tail = (StgTSO *)last_tso;
2408 barf("unblockThread (MVAR): TSO not found");
2411 case BlockedOnBlackHole:
2412 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2414 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2416 last = &bq->blocking_queue;
2417 for (t = bq->blocking_queue;
2419 last = &t->link, t = t->link) {
2420 if (t == (StgBlockingQueueElement *)tso) {
2421 *last = (StgBlockingQueueElement *)tso->link;
2425 barf("unblockThread (BLACKHOLE): TSO not found");
2428 case BlockedOnException:
2430 StgTSO *target = tso->block_info.tso;
2432 ASSERT(get_itbl(target)->type == TSO);
2433 ASSERT(target->blocked_exceptions != NULL);
2435 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2436 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2438 last = &t->link, t = t->link) {
2439 ASSERT(get_itbl(t)->type == TSO);
2440 if (t == (StgBlockingQueueElement *)tso) {
2441 *last = (StgBlockingQueueElement *)tso->link;
2445 barf("unblockThread (Exception): TSO not found");
2448 case BlockedOnDelay:
2450 case BlockedOnWrite:
2452 StgBlockingQueueElement *prev = NULL;
2453 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2454 prev = t, t = t->link) {
2455 if (t == (StgBlockingQueueElement *)tso) {
2457 blocked_queue_hd = (StgTSO *)t->link;
2458 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2459 blocked_queue_tl = END_TSO_QUEUE;
2462 prev->link = t->link;
2463 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2464 blocked_queue_tl = (StgTSO *)prev;
2470 barf("unblockThread (I/O): TSO not found");
2474 barf("unblockThread");
2478 tso->link = END_TSO_QUEUE;
2479 tso->why_blocked = NotBlocked;
2480 tso->block_info.closure = NULL;
2481 PUSH_ON_RUN_QUEUE(tso);
2482 RELEASE_LOCK(&sched_mutex);
2486 unblockThread(StgTSO *tso)
2490 ACQUIRE_LOCK(&sched_mutex);
2491 switch (tso->why_blocked) {
2494 return; /* not blocked */
2497 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2499 StgTSO *last_tso = END_TSO_QUEUE;
2500 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2503 for (t = mvar->head; t != END_TSO_QUEUE;
2504 last = &t->link, last_tso = t, t = t->link) {
2507 if (mvar->tail == tso) {
2508 mvar->tail = last_tso;
2513 barf("unblockThread (MVAR): TSO not found");
2516 case BlockedOnBlackHole:
2517 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2519 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2521 last = &bq->blocking_queue;
2522 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2523 last = &t->link, t = t->link) {
2529 barf("unblockThread (BLACKHOLE): TSO not found");
2532 case BlockedOnException:
2534 StgTSO *target = tso->block_info.tso;
2536 ASSERT(get_itbl(target)->type == TSO);
2537 ASSERT(target->blocked_exceptions != NULL);
2539 last = &target->blocked_exceptions;
2540 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2541 last = &t->link, t = t->link) {
2542 ASSERT(get_itbl(t)->type == TSO);
2548 barf("unblockThread (Exception): TSO not found");
2551 case BlockedOnDelay:
2553 case BlockedOnWrite:
2555 StgTSO *prev = NULL;
2556 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2557 prev = t, t = t->link) {
2560 blocked_queue_hd = t->link;
2561 if (blocked_queue_tl == t) {
2562 blocked_queue_tl = END_TSO_QUEUE;
2565 prev->link = t->link;
2566 if (blocked_queue_tl == t) {
2567 blocked_queue_tl = prev;
2573 barf("unblockThread (I/O): TSO not found");
2577 barf("unblockThread");
2581 tso->link = END_TSO_QUEUE;
2582 tso->why_blocked = NotBlocked;
2583 tso->block_info.closure = NULL;
2584 PUSH_ON_RUN_QUEUE(tso);
2585 RELEASE_LOCK(&sched_mutex);
2589 /* -----------------------------------------------------------------------------
2592 * The following function implements the magic for raising an
2593 * asynchronous exception in an existing thread.
2595 * We first remove the thread from any queue on which it might be
2596 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2598 * We strip the stack down to the innermost CATCH_FRAME, building
2599 * thunks in the heap for all the active computations, so they can
2600 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2601 * an application of the handler to the exception, and push it on
2602 * the top of the stack.
2604 * How exactly do we save all the active computations? We create an
2605 * AP_UPD for every UpdateFrame on the stack. Entering one of these
2606 * AP_UPDs pushes everything from the corresponding update frame
2607 * upwards onto the stack. (Actually, it pushes everything up to the
2608 * next update frame plus a pointer to the next AP_UPD object.
2609 * Entering the next AP_UPD object pushes more onto the stack until we
2610 * reach the last AP_UPD object - at which point the stack should look
2611 * exactly as it did when we killed the TSO and we can continue
2612 * execution by entering the closure on top of the stack.
2614 * We can also kill a thread entirely - this happens if either (a) the
2615 * exception passed to raiseAsync is NULL, or (b) there's no
2616 * CATCH_FRAME on the stack. In either case, we strip the entire
2617 * stack and replace the thread with a zombie.
2619 * -------------------------------------------------------------------------- */
2622 deleteThread(StgTSO *tso)
2624 raiseAsync(tso,NULL);
2628 raiseAsync(StgTSO *tso, StgClosure *exception)
2630 StgUpdateFrame* su = tso->su;
2631 StgPtr sp = tso->sp;
2633 /* Thread already dead? */
2634 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2638 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2640 /* Remove it from any blocking queues */
2643 /* The stack freezing code assumes there's a closure pointer on
2644 * the top of the stack. This isn't always the case with compiled
2645 * code, so we have to push a dummy closure on the top which just
2646 * returns to the next return address on the stack.
2648 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2649 *(--sp) = (W_)&dummy_ret_closure;
2653 int words = ((P_)su - (P_)sp) - 1;
2657 /* If we find a CATCH_FRAME, and we've got an exception to raise,
2658 * then build PAP(handler,exception,realworld#), and leave it on
2659 * top of the stack ready to enter.
2661 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2662 StgCatchFrame *cf = (StgCatchFrame *)su;
2663 /* we've got an exception to raise, so let's pass it to the
2664 * handler in this frame.
2666 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2667 TICK_ALLOC_UPD_PAP(3,0);
2668 SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2671 ap->fun = cf->handler; /* :: Exception -> IO a */
2672 ap->payload[0] = (P_)exception;
2673 ap->payload[1] = ARG_TAG(0); /* realworld token */
2675 /* throw away the stack from Sp up to and including the
2678 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
2681 /* Restore the blocked/unblocked state for asynchronous exceptions
2682 * at the CATCH_FRAME.
2684 * If exceptions were unblocked at the catch, arrange that they
2685 * are unblocked again after executing the handler by pushing an
2686 * unblockAsyncExceptions_ret stack frame.
2688 if (!cf->exceptions_blocked) {
2689 *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2692 /* Ensure that async exceptions are blocked when running the handler.
2694 if (tso->blocked_exceptions == NULL) {
2695 tso->blocked_exceptions = END_TSO_QUEUE;
2698 /* Put the newly-built PAP on top of the stack, ready to execute
2699 * when the thread restarts.
2703 tso->what_next = ThreadEnterGHC;
2707 /* First build an AP_UPD consisting of the stack chunk above the
2708 * current update frame, with the top word on the stack as the
2711 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2716 ap->fun = (StgClosure *)sp[0];
2718 for(i=0; i < (nat)words; ++i) {
2719 ap->payload[i] = (P_)*sp++;
2722 switch (get_itbl(su)->type) {
2726 SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */);
2727 TICK_ALLOC_UP_THK(words+1,0);
2730 fprintf(stderr, "scheduler: Updating ");
2731 printPtr((P_)su->updatee);
2732 fprintf(stderr, " with ");
2733 printObj((StgClosure *)ap);
2736 /* Replace the updatee with an indirection - happily
2737 * this will also wake up any threads currently
2738 * waiting on the result.
2740 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
2742 sp += sizeofW(StgUpdateFrame) -1;
2743 sp[0] = (W_)ap; /* push onto stack */
2749 StgCatchFrame *cf = (StgCatchFrame *)su;
2752 /* We want a PAP, not an AP_UPD. Fortunately, the
2753 * layout's the same.
2755 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2756 TICK_ALLOC_UPD_PAP(words+1,0);
2758 /* now build o = FUN(catch,ap,handler) */
2759 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2760 TICK_ALLOC_FUN(2,0);
2761 SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2762 o->payload[0] = (StgClosure *)ap;
2763 o->payload[1] = cf->handler;
2766 fprintf(stderr, "scheduler: Built ");
2767 printObj((StgClosure *)o);
2770 /* pop the old handler and put o on the stack */
2772 sp += sizeofW(StgCatchFrame) - 1;
2779 StgSeqFrame *sf = (StgSeqFrame *)su;
2782 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2783 TICK_ALLOC_UPD_PAP(words+1,0);
2785 /* now build o = FUN(seq,ap) */
2786 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2787 TICK_ALLOC_SE_THK(1,0);
2788 SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2789 o->payload[0] = (StgClosure *)ap;
2792 fprintf(stderr, "scheduler: Built ");
2793 printObj((StgClosure *)o);
2796 /* pop the old handler and put o on the stack */
2798 sp += sizeofW(StgSeqFrame) - 1;
2804 /* We've stripped the entire stack, the thread is now dead. */
2805 sp += sizeofW(StgStopFrame) - 1;
2806 sp[0] = (W_)exception; /* save the exception */
2807 tso->what_next = ThreadKilled;
2808 tso->su = (StgUpdateFrame *)(sp+1);
2819 /* -----------------------------------------------------------------------------
2820 resurrectThreads is called after garbage collection on the list of
2821 threads found to be garbage. Each of these threads will be woken
2822 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2823 on an MVar, or NonTermination if the thread was blocked on a Black
2825 -------------------------------------------------------------------------- */
2828 resurrectThreads( StgTSO *threads )
2832 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2833 next = tso->global_link;
2834 tso->global_link = all_threads;
2836 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2838 switch (tso->why_blocked) {
2840 case BlockedOnException:
2841 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2843 case BlockedOnBlackHole:
2844 raiseAsync(tso,(StgClosure *)NonTermination_closure);
2847 /* This might happen if the thread was blocked on a black hole
2848 * belonging to a thread that we've just woken up (raiseAsync
2849 * can wake up threads, remember...).
2853 barf("resurrectThreads: thread blocked in a strange way");
2858 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2859 //@subsection Debugging Routines
2861 /* -----------------------------------------------------------------------------
2862 Debugging: why is a thread blocked
2863 -------------------------------------------------------------------------- */
2868 printThreadBlockage(StgTSO *tso)
2870 switch (tso->why_blocked) {
2872 fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2874 case BlockedOnWrite:
2875 fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2877 case BlockedOnDelay:
2878 #if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS)
2879 fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2881 fprintf(stderr,"blocked on delay of %d ms",
2882 tso->block_info.target - getourtimeofday());
2886 fprintf(stderr,"blocked on an MVar");
2888 case BlockedOnException:
2889 fprintf(stderr,"blocked on delivering an exception to thread %d",
2890 tso->block_info.tso->id);
2892 case BlockedOnBlackHole:
2893 fprintf(stderr,"blocked on a black hole");
2896 fprintf(stderr,"not blocked");
2900 fprintf(stderr,"blocked on global address; local FM_BQ is %p (%s)",
2901 tso->block_info.closure, info_type(tso->block_info.closure));
2903 case BlockedOnGA_NoSend:
2904 fprintf(stderr,"blocked on global address (no send); local FM_BQ is %p (%s)",
2905 tso->block_info.closure, info_type(tso->block_info.closure));
2909 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
2910 tso->why_blocked, tso->id, tso);
2915 printThreadStatus(StgTSO *tso)
2917 switch (tso->what_next) {
2919 fprintf(stderr,"has been killed");
2921 case ThreadComplete:
2922 fprintf(stderr,"has completed");
2925 printThreadBlockage(tso);
2930 printAllThreads(void)
2934 sched_belch("all threads:");
2935 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2936 fprintf(stderr, "\tthread %d is ", t->id);
2937 printThreadStatus(t);
2938 fprintf(stderr,"\n");
2943 Print a whole blocking queue attached to node (debugging only).
2948 print_bq (StgClosure *node)
2950 StgBlockingQueueElement *bqe;
2954 fprintf(stderr,"## BQ of closure %p (%s): ",
2955 node, info_type(node));
2957 /* should cover all closures that may have a blocking queue */
2958 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2959 get_itbl(node)->type == FETCH_ME_BQ ||
2960 get_itbl(node)->type == RBH);
2962 ASSERT(node!=(StgClosure*)NULL); // sanity check
2964 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2966 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2967 !end; // iterate until bqe points to a CONSTR
2968 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2969 ASSERT(bqe != END_BQ_QUEUE); // sanity check
2970 ASSERT(bqe != (StgTSO*)NULL); // sanity check
2971 /* types of closures that may appear in a blocking queue */
2972 ASSERT(get_itbl(bqe)->type == TSO ||
2973 get_itbl(bqe)->type == BLOCKED_FETCH ||
2974 get_itbl(bqe)->type == CONSTR);
2975 /* only BQs of an RBH end with an RBH_Save closure */
2976 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2978 switch (get_itbl(bqe)->type) {
2980 fprintf(stderr," TSO %d (%x),",
2981 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
2984 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
2985 ((StgBlockedFetch *)bqe)->node,
2986 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
2987 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
2988 ((StgBlockedFetch *)bqe)->ga.weight);
2991 fprintf(stderr," %s (IP %p),",
2992 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2993 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2994 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2995 "RBH_Save_?"), get_itbl(bqe));
2998 barf("Unexpected closure type %s in blocking queue of %p (%s)",
2999 info_type(bqe), node, info_type(node));
3003 fputc('\n', stderr);
3005 # elif defined(GRAN)
3007 print_bq (StgClosure *node)
3009 StgBlockingQueueElement *bqe;
3010 PEs node_loc, tso_loc;
3013 /* should cover all closures that may have a blocking queue */
3014 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3015 get_itbl(node)->type == FETCH_ME_BQ ||
3016 get_itbl(node)->type == RBH);
3018 ASSERT(node!=(StgClosure*)NULL); // sanity check
3019 node_loc = where_is(node);
3021 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3022 node, info_type(node), node_loc);
3025 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3027 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3028 !end; // iterate until bqe points to a CONSTR
3029 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3030 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3031 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3032 /* types of closures that may appear in a blocking queue */
3033 ASSERT(get_itbl(bqe)->type == TSO ||
3034 get_itbl(bqe)->type == CONSTR);
3035 /* only BQs of an RBH end with an RBH_Save closure */
3036 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3038 tso_loc = where_is((StgClosure *)bqe);
3039 switch (get_itbl(bqe)->type) {
3041 fprintf(stderr," TSO %d (%p) on [PE %d],",
3042 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3045 fprintf(stderr," %s (IP %p),",
3046 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3047 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3048 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3049 "RBH_Save_?"), get_itbl(bqe));
3052 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3053 info_type((StgClosure *)bqe), node, info_type(node));
3057 fputc('\n', stderr);
3061 Nice and easy: only TSOs on the blocking queue
3064 print_bq (StgClosure *node)
3068 ASSERT(node!=(StgClosure*)NULL); // sanity check
3069 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3070 tso != END_TSO_QUEUE;
3072 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3073 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3074 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3076 fputc('\n', stderr);
3087 for (i=0, tso=run_queue_hd;
3088 tso != END_TSO_QUEUE;
3097 sched_belch(char *s, ...)
3102 fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3104 fprintf(stderr, "scheduler: ");
3106 vfprintf(stderr, s, ap);
3107 fprintf(stderr, "\n");
3113 //@node Index, , Debugging Routines, Main scheduling code
3117 //* MainRegTable:: @cindex\s-+MainRegTable
3118 //* StgMainThread:: @cindex\s-+StgMainThread
3119 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3120 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3121 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3122 //* context_switch:: @cindex\s-+context_switch
3123 //* createThread:: @cindex\s-+createThread
3124 //* free_capabilities:: @cindex\s-+free_capabilities
3125 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3126 //* initScheduler:: @cindex\s-+initScheduler
3127 //* interrupted:: @cindex\s-+interrupted
3128 //* n_free_capabilities:: @cindex\s-+n_free_capabilities
3129 //* next_thread_id:: @cindex\s-+next_thread_id
3130 //* print_bq:: @cindex\s-+print_bq
3131 //* run_queue_hd:: @cindex\s-+run_queue_hd
3132 //* run_queue_tl:: @cindex\s-+run_queue_tl
3133 //* sched_mutex:: @cindex\s-+sched_mutex
3134 //* schedule:: @cindex\s-+schedule
3135 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3136 //* task_ids:: @cindex\s-+task_ids
3137 //* term_mutex:: @cindex\s-+term_mutex
3138 //* thread_ready_cond:: @cindex\s-+thread_ready_cond