1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.68 2000/04/14 16:47:43 panne Exp $
4 * (c) The GHC Team, 1998-2000
8 * The main scheduling code in GranSim is quite different from that in std
9 * (concurrent) Haskell: while concurrent Haskell just iterates over the
10 * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11 * over the events in the global event queue. -- HWL
12 * --------------------------------------------------------------------------*/
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
17 /* Version with scheduler monitor support for SMPs.
19 This design provides a high-level API to create and schedule threads etc.
20 as documented in the SMP design document.
22 It uses a monitor design controlled by a single mutex to exercise control
23 over accesses to shared data structures, and builds on the Posix threads
26 The majority of state is shared. In order to keep essential per-task state,
27 there is a Capability structure, which contains all the information
28 needed to run a thread: its STG registers, a pointer to its TSO, a
29 nursery etc. During STG execution, a pointer to the capability is
30 kept in a register (BaseReg).
32 In a non-SMP build, there is one global capability, namely MainRegTable.
39 //* Variables and Data structures::
40 //* Main scheduling loop::
41 //* Suspend and Resume::
43 //* Garbage Collextion Routines::
44 //* Blocking Queue Routines::
45 //* Exception Handling Routines::
46 //* Debugging Routines::
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
59 #include "StgStartup.h"
63 #include "StgMiscClosures.h"
65 #include "Evaluator.h"
66 #include "Exception.h"
74 #if defined(GRAN) || defined(PAR)
75 # include "GranSimRts.h"
77 # include "ParallelRts.h"
78 # include "Parallel.h"
79 # include "ParallelDebug.h"
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
92 * These are the threads which clients have requested that we run.
94 * In an SMP build, we might have several concurrent clients all
95 * waiting for results, and each one will wait on a condition variable
96 * until the result is available.
98 * In non-SMP, clients are strictly nested: the first client calls
99 * into the RTS, which might call out again to C with a _ccall_GC, and
100 * eventually re-enter the RTS.
102 * Main threads information is kept in a linked list:
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
107 SchedulerStatus stat;
110 pthread_cond_t wakeup;
112 struct StgMainThread_ *link;
115 /* Main thread queue.
116 * Locks required: sched_mutex.
118 static StgMainThread *main_threads;
121 * Locks required: sched_mutex.
125 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
126 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
129 In GranSim we have a runable and a blocked queue for each processor.
130 In order to minimise code changes new arrays run_queue_hds/tls
131 are created. run_queue_hd is then a short cut (macro) for
132 run_queue_hds[CurrentProc] (see GranSim.h).
135 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
136 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
137 StgTSO *ccalling_threadss[MAX_PROC];
138 /* We use the same global list of threads (all_threads) in GranSim as in
139 the std RTS (i.e. we are cheating). However, we don't use this list in
140 the GranSim specific code at the moment (so we are only potentially
145 StgTSO *run_queue_hd, *run_queue_tl;
146 StgTSO *blocked_queue_hd, *blocked_queue_tl;
150 /* Linked list of all threads.
151 * Used for detecting garbage collected threads.
155 /* Threads suspended in _ccall_GC.
157 static StgTSO *suspended_ccalling_threads;
159 static void GetRoots(void);
160 static StgTSO *threadStackOverflow(StgTSO *tso);
162 /* KH: The following two flags are shared memory locations. There is no need
163 to lock them, since they are only unset at the end of a scheduler
167 /* flag set by signal handler to precipitate a context switch */
168 //@cindex context_switch
171 /* if this flag is set as well, give up execution */
172 //@cindex interrupted
175 /* Next thread ID to allocate.
176 * Locks required: sched_mutex
178 //@cindex next_thread_id
179 StgThreadID next_thread_id = 1;
182 * Pointers to the state of the current thread.
183 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
184 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
187 /* The smallest stack size that makes any sense is:
188 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
189 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
190 * + 1 (the realworld token for an IO thread)
191 * + 1 (the closure to enter)
193 * A thread with this stack will bomb immediately with a stack
194 * overflow, which will increase its stack size.
197 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
199 /* Free capability list.
200 * Locks required: sched_mutex.
203 //@cindex free_capabilities
204 //@cindex n_free_capabilities
205 Capability *free_capabilities; /* Available capabilities for running threads */
206 nat n_free_capabilities; /* total number of available capabilities */
208 //@cindex MainRegTable
209 Capability MainRegTable; /* for non-SMP, we have one global capability */
218 /* All our current task ids, saved in case we need to kill them later.
225 void addToBlockedQueue ( StgTSO *tso );
227 static void schedule ( void );
228 void interruptStgRts ( void );
230 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
232 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
236 static void sched_belch(char *s, ...);
240 //@cindex sched_mutex
242 //@cindex thread_ready_cond
243 //@cindex gc_pending_cond
244 pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
245 pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
246 pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
247 pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
254 rtsTime TimeOfLastYield;
258 char *whatNext_strs[] = {
266 char *threadReturnCode_strs[] = {
267 "HeapOverflow", /* might also be StackOverflow */
276 * The thread state for the main thread.
277 // ToDo: check whether not needed any more
281 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
282 //@subsection Main scheduling loop
284 /* ---------------------------------------------------------------------------
285 Main scheduling loop.
287 We use round-robin scheduling, each thread returning to the
288 scheduler loop when one of these conditions is detected:
291 * timer expires (thread yields)
296 Locking notes: we acquire the scheduler lock once at the beginning
297 of the scheduler loop, and release it when
299 * running a thread, or
300 * waiting for work, or
301 * waiting for a GC to complete.
304 In a GranSim setup this loop iterates over the global event queue.
305 This revolves around the global event queue, which determines what
306 to do next. Therefore, it's more complicated than either the
307 concurrent or the parallel (GUM) setup.
310 GUM iterates over incoming messages.
311 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312 and sends out a fish whenever it has nothing to do; in-between
313 doing the actual reductions (shared code below) it processes the
314 incoming messages and deals with delayed operations
315 (see PendingFetches).
316 This is not the ugliest code you could imagine, but it's bloody close.
318 ------------------------------------------------------------------------ */
325 StgThreadReturnCode ret;
334 rtsBool was_interrupted = rtsFalse;
336 ACQUIRE_LOCK(&sched_mutex);
340 /* set up first event to get things going */
341 /* ToDo: assign costs for system setup and init MainTSO ! */
342 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
344 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
347 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
348 G_TSO(CurrentTSO, 5));
350 if (RtsFlags.GranFlags.Light) {
351 /* Save current time; GranSim Light only */
352 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
355 event = get_next_event();
357 while (event!=(rtsEvent*)NULL) {
358 /* Choose the processor with the next event */
359 CurrentProc = event->proc;
360 CurrentTSO = event->tso;
364 while (!GlobalStopPending) { /* GlobalStopPending set in par_exit */
372 IF_DEBUG(scheduler, printAllThreads());
374 /* If we're interrupted (the user pressed ^C, or some other
375 * termination condition occurred), kill all the currently running
379 IF_DEBUG(scheduler, sched_belch("interrupted"));
380 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
383 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
386 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
387 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
388 interrupted = rtsFalse;
389 was_interrupted = rtsTrue;
392 /* Go through the list of main threads and wake up any
393 * clients whose computations have finished. ToDo: this
394 * should be done more efficiently without a linear scan
395 * of the main threads list, somehow...
399 StgMainThread *m, **prev;
400 prev = &main_threads;
401 for (m = main_threads; m != NULL; m = m->link) {
402 switch (m->tso->what_next) {
405 *(m->ret) = (StgClosure *)m->tso->sp[0];
409 pthread_cond_broadcast(&m->wakeup);
413 if (was_interrupted) {
414 m->stat = Interrupted;
418 pthread_cond_broadcast(&m->wakeup);
428 /* in GUM do this only on the Main PE */
431 /* If our main thread has finished or been killed, return.
434 StgMainThread *m = main_threads;
435 if (m->tso->what_next == ThreadComplete
436 || m->tso->what_next == ThreadKilled) {
437 main_threads = main_threads->link;
438 if (m->tso->what_next == ThreadComplete) {
439 /* we finished successfully, fill in the return value */
440 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
444 if (was_interrupted) {
445 m->stat = Interrupted;
455 /* Top up the run queue from our spark pool. We try to make the
456 * number of threads in the run queue equal to the number of
461 nat n = n_free_capabilities;
462 StgTSO *tso = run_queue_hd;
464 /* Count the run queue */
465 while (n > 0 && tso != END_TSO_QUEUE) {
474 break; /* no more sparks in the pool */
476 /* I'd prefer this to be done in activateSpark -- HWL */
477 /* tricky - it needs to hold the scheduler lock and
478 * not try to re-acquire it -- SDM */
480 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
481 pushClosure(tso,spark);
482 PUSH_ON_RUN_QUEUE(tso);
484 advisory_thread_count++;
488 sched_belch("turning spark of closure %p into a thread",
489 (StgClosure *)spark));
492 /* We need to wake up the other tasks if we just created some
495 if (n_free_capabilities - n > 1) {
496 pthread_cond_signal(&thread_ready_cond);
501 /* Check whether any waiting threads need to be woken up. If the
502 * run queue is empty, and there are no other tasks running, we
503 * can wait indefinitely for something to happen.
504 * ToDo: what if another client comes along & requests another
507 if (blocked_queue_hd != END_TSO_QUEUE) {
509 (run_queue_hd == END_TSO_QUEUE)
511 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
516 /* check for signals each time around the scheduler */
518 if (signals_pending()) {
519 start_signal_handlers();
523 /* Detect deadlock: when we have no threads to run, there are
524 * no threads waiting on I/O or sleeping, and all the other
525 * tasks are waiting for work, we must have a deadlock. Inform
526 * all the main threads.
529 if (blocked_queue_hd == END_TSO_QUEUE
530 && run_queue_hd == END_TSO_QUEUE
531 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
534 for (m = main_threads; m != NULL; m = m->link) {
537 pthread_cond_broadcast(&m->wakeup);
542 if (blocked_queue_hd == END_TSO_QUEUE
543 && run_queue_hd == END_TSO_QUEUE) {
544 StgMainThread *m = main_threads;
547 main_threads = m->link;
553 /* If there's a GC pending, don't do anything until it has
557 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
558 pthread_cond_wait(&gc_pending_cond, &sched_mutex);
561 /* block until we've got a thread on the run queue and a free
564 while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
565 IF_DEBUG(scheduler, sched_belch("waiting for work"));
566 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
567 IF_DEBUG(scheduler, sched_belch("work now available"));
573 if (RtsFlags.GranFlags.Light)
574 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
576 /* adjust time based on time-stamp */
577 if (event->time > CurrentTime[CurrentProc] &&
578 event->evttype != ContinueThread)
579 CurrentTime[CurrentProc] = event->time;
581 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
582 if (!RtsFlags.GranFlags.Light)
585 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
587 /* main event dispatcher in GranSim */
588 switch (event->evttype) {
589 /* Should just be continuing execution */
591 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
592 /* ToDo: check assertion
593 ASSERT(run_queue_hd != (StgTSO*)NULL &&
594 run_queue_hd != END_TSO_QUEUE);
596 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
597 if (!RtsFlags.GranFlags.DoAsyncFetch &&
598 procStatus[CurrentProc]==Fetching) {
599 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
600 CurrentTSO->id, CurrentTSO, CurrentProc);
603 /* Ignore ContinueThreads for completed threads */
604 if (CurrentTSO->what_next == ThreadComplete) {
605 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
606 CurrentTSO->id, CurrentTSO, CurrentProc);
609 /* Ignore ContinueThreads for threads that are being migrated */
610 if (PROCS(CurrentTSO)==Nowhere) {
611 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
612 CurrentTSO->id, CurrentTSO, CurrentProc);
615 /* The thread should be at the beginning of the run queue */
616 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
617 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
618 CurrentTSO->id, CurrentTSO, CurrentProc);
619 break; // run the thread anyway
622 new_event(proc, proc, CurrentTime[proc],
624 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
626 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
627 break; // now actually run the thread; DaH Qu'vam yImuHbej
630 do_the_fetchnode(event);
631 goto next_thread; /* handle next event in event queue */
634 do_the_globalblock(event);
635 goto next_thread; /* handle next event in event queue */
638 do_the_fetchreply(event);
639 goto next_thread; /* handle next event in event queue */
641 case UnblockThread: /* Move from the blocked queue to the tail of */
642 do_the_unblock(event);
643 goto next_thread; /* handle next event in event queue */
645 case ResumeThread: /* Move from the blocked queue to the tail of */
646 /* the runnable queue ( i.e. Qu' SImqa'lu') */
647 event->tso->gran.blocktime +=
648 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
649 do_the_startthread(event);
650 goto next_thread; /* handle next event in event queue */
653 do_the_startthread(event);
654 goto next_thread; /* handle next event in event queue */
657 do_the_movethread(event);
658 goto next_thread; /* handle next event in event queue */
661 do_the_movespark(event);
662 goto next_thread; /* handle next event in event queue */
665 do_the_findwork(event);
666 goto next_thread; /* handle next event in event queue */
669 barf("Illegal event type %u\n", event->evttype);
672 /* This point was scheduler_loop in the old RTS */
674 IF_DEBUG(gran, belch("GRAN: after main switch"));
676 TimeOfLastEvent = CurrentTime[CurrentProc];
677 TimeOfNextEvent = get_time_of_next_event();
678 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
679 // CurrentTSO = ThreadQueueHd;
681 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
684 if (RtsFlags.GranFlags.Light)
685 GranSimLight_leave_system(event, &ActiveTSO);
687 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
690 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
692 /* in a GranSim setup the TSO stays on the run queue */
694 /* Take a thread from the run queue. */
695 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
698 fprintf(stderr, "GRAN: About to run current thread, which is\n");
701 context_switch = 0; // turned on via GranYield, checking events and time slice
704 DumpGranEvent(GR_SCHEDULE, t));
706 procStatus[CurrentProc] = Busy;
710 if (PendingFetches != END_BF_QUEUE) {
714 /* ToDo: phps merge with spark activation above */
715 /* check whether we have local work and send requests if we have none */
716 if (run_queue_hd == END_TSO_QUEUE) { /* no runnable threads */
717 /* :-[ no local threads => look out for local sparks */
718 /* the spark pool for the current PE */
719 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
720 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
721 pool->hd < pool->tl) {
723 * ToDo: add GC code check that we really have enough heap afterwards!!
725 * If we're here (no runnable threads) and we have pending
726 * sparks, we must have a space problem. Get enough space
727 * to turn one of those pending sparks into a
731 spark = findSpark(); /* get a spark */
732 if (spark != (rtsSpark) NULL) {
733 tso = activateSpark(spark); /* turn the spark into a thread */
734 IF_PAR_DEBUG(schedule,
735 belch("==== schedule: Created TSO %d (%p); %d threads active",
736 tso->id, tso, advisory_thread_count));
738 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
739 belch("==^^ failed to activate spark");
741 } /* otherwise fall through & pick-up new tso */
743 IF_PAR_DEBUG(verbose,
744 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
745 spark_queue_len(pool)));
749 /* =8-[ no local sparks => look for work on other PEs */
752 * We really have absolutely no work. Send out a fish
753 * (there may be some out there already), and wait for
754 * something to arrive. We clearly can't run any threads
755 * until a SCHEDULE or RESUME arrives, and so that's what
756 * we're hoping to see. (Of course, we still have to
757 * respond to other types of messages.)
760 outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
761 // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
762 /* fishing set in sendFish, processFish;
763 avoid flooding system with fishes via delay */
765 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
773 } else if (PacketsWaiting()) { /* Look for incoming messages */
777 /* Now we are sure that we have some work available */
778 ASSERT(run_queue_hd != END_TSO_QUEUE);
779 /* Take a thread from the run queue, if we have work */
780 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
782 /* ToDo: write something to the log-file
783 if (RTSflags.ParFlags.granSimStats && !sameThread)
784 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
788 /* the spark pool for the current PE */
789 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
791 IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)",
792 spark_queue_len(pool),
794 pool->hd, pool->tl, pool->base, pool->lim));
796 IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)",
797 run_queue_len(), CURRENT_PROC,
798 run_queue_hd, run_queue_tl));
803 we are running a different TSO, so write a schedule event to log file
804 NB: If we use fair scheduling we also have to write a deschedule
805 event for LastTSO; with unfair scheduling we know that the
806 previous tso has blocked whenever we switch to another tso, so
807 we don't need it in GUM for now
809 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
810 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
814 #else /* !GRAN && !PAR */
816 /* grab a thread from the run queue
818 ASSERT(run_queue_hd != END_TSO_QUEUE);
820 IF_DEBUG(sanity,checkTSO(t));
827 cap = free_capabilities;
828 free_capabilities = cap->link;
829 n_free_capabilities--;
834 cap->rCurrentTSO = t;
836 /* set the context_switch flag
838 if (run_queue_hd == END_TSO_QUEUE)
843 RELEASE_LOCK(&sched_mutex);
845 IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
846 t->id, t, whatNext_strs[t->what_next]));
848 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
849 /* Run the current thread
851 switch (cap->rCurrentTSO->what_next) {
854 /* Thread already finished, return to scheduler. */
855 ret = ThreadFinished;
858 ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
861 ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
863 case ThreadEnterHugs:
867 IF_DEBUG(scheduler,sched_belch("entering Hugs"));
868 c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
869 cap->rCurrentTSO->sp += 1;
874 barf("Panic: entered a BCO but no bytecode interpreter in this build");
877 barf("schedule: invalid what_next field");
879 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
881 /* Costs for the scheduler are assigned to CCS_SYSTEM */
886 ACQUIRE_LOCK(&sched_mutex);
889 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
890 #elif !defined(GRAN) && !defined(PAR)
891 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
893 t = cap->rCurrentTSO;
896 /* HACK 675: if the last thread didn't yield, make sure to print a
897 SCHEDULE event to the log file when StgRunning the next thread, even
898 if it is the same one as before */
899 LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t;
900 TimeOfLastYield = CURRENT_TIME;
905 /* make all the running tasks block on a condition variable,
906 * maybe set context_switch and wait till they all pile in,
907 * then have them wait on a GC condition variable.
909 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
910 t->id, t, whatNext_strs[t->what_next]));
913 ASSERT(!is_on_queue(t,CurrentProc));
916 ready_to_gc = rtsTrue;
917 context_switch = 1; /* stop other threads ASAP */
918 PUSH_ON_RUN_QUEUE(t);
919 /* actual GC is done at the end of the while loop */
923 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
924 t->id, t, whatNext_strs[t->what_next]));
925 /* just adjust the stack for this thread, then pop it back
931 /* enlarge the stack */
932 StgTSO *new_t = threadStackOverflow(t);
934 /* This TSO has moved, so update any pointers to it from the
935 * main thread stack. It better not be on any other queues...
938 for (m = main_threads; m != NULL; m = m->link) {
944 PUSH_ON_RUN_QUEUE(new_t);
951 DumpGranEvent(GR_DESCHEDULE, t));
952 globalGranStats.tot_yields++;
955 DumpGranEvent(GR_DESCHEDULE, t));
957 /* put the thread back on the run queue. Then, if we're ready to
958 * GC, check whether this is the last task to stop. If so, wake
959 * up the GC thread. getThread will block during a GC until the
963 if (t->what_next == ThreadEnterHugs) {
964 /* ToDo: or maybe a timer expired when we were in Hugs?
965 * or maybe someone hit ctrl-C
967 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
968 t->id, t, whatNext_strs[t->what_next]);
970 belch("--<< thread %ld (%p; %s) stopped, yielding",
971 t->id, t, whatNext_strs[t->what_next]);
976 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
978 ASSERT(t->link == END_TSO_QUEUE);
980 ASSERT(!is_on_queue(t,CurrentProc));
983 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
984 checkThreadQsSanity(rtsTrue));
986 APPEND_TO_RUN_QUEUE(t);
988 /* add a ContinueThread event to actually process the thread */
989 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
991 t, (StgClosure*)NULL, (rtsSpark*)NULL);
993 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1002 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1003 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1004 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1006 // ??? needed; should emit block before
1008 DumpGranEvent(GR_DESCHEDULE, t));
1009 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1012 ASSERT(procStatus[CurrentProc]==Busy ||
1013 ((procStatus[CurrentProc]==Fetching) &&
1014 (t->block_info.closure!=(StgClosure*)NULL)));
1015 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1016 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1017 procStatus[CurrentProc]==Fetching))
1018 procStatus[CurrentProc] = Idle;
1022 DumpGranEvent(GR_DESCHEDULE, t));
1024 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1028 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1029 t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1030 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1033 /* don't need to do anything. Either the thread is blocked on
1034 * I/O, in which case we'll have called addToBlockedQueue
1035 * previously, or it's blocked on an MVar or Blackhole, in which
1036 * case it'll be on the relevant queue already.
1039 fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1040 printThreadBlockage(t);
1041 fprintf(stderr, "\n"));
1043 /* Only for dumping event to log file
1044 ToDo: do I need this in GranSim, too?
1051 case ThreadFinished:
1052 /* Need to check whether this was a main thread, and if so, signal
1053 * the task that started it with the return value. If we have no
1054 * more main threads, we probably need to stop all the tasks until
1057 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1058 t->what_next = ThreadComplete;
1060 endThread(t, CurrentProc); // clean-up the thread
1062 advisory_thread_count--;
1063 if (RtsFlags.ParFlags.ParStats.Full)
1064 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1069 barf("schedule: invalid thread return code %d", (int)ret);
1073 cap->link = free_capabilities;
1074 free_capabilities = cap;
1075 n_free_capabilities++;
1079 if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
1084 /* everybody back, start the GC.
1085 * Could do it in this thread, or signal a condition var
1086 * to do it in another thread. Either way, we need to
1087 * broadcast on gc_pending_cond afterward.
1090 IF_DEBUG(scheduler,sched_belch("doing GC"));
1092 GarbageCollect(GetRoots,rtsFalse);
1093 ready_to_gc = rtsFalse;
1095 pthread_cond_broadcast(&gc_pending_cond);
1098 /* add a ContinueThread event to continue execution of current thread */
1099 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1101 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1103 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1110 IF_GRAN_DEBUG(unused,
1111 print_eventq(EventHd));
1113 event = get_next_event();
1117 /* ToDo: wait for next message to arrive rather than busy wait */
1122 t = take_off_run_queue(END_TSO_QUEUE);
1125 } /* end of while(1) */
1128 /* A hack for Hugs concurrency support. Needs sanitisation (?) */
1129 void deleteAllThreads ( void )
1132 IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
1133 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1136 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1139 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1140 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1143 /* startThread and insertThread are now in GranSim.c -- HWL */
1145 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1146 //@subsection Suspend and Resume
1148 /* ---------------------------------------------------------------------------
1149 * Suspending & resuming Haskell threads.
1151 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1152 * its capability before calling the C function. This allows another
1153 * task to pick up the capability and carry on running Haskell
1154 * threads. It also means that if the C call blocks, it won't lock
1157 * The Haskell thread making the C call is put to sleep for the
1158 * duration of the call, on the susepended_ccalling_threads queue. We
1159 * give out a token to the task, which it can use to resume the thread
1160 * on return from the C function.
1161 * ------------------------------------------------------------------------- */
1164 suspendThread( Capability *cap )
1168 ACQUIRE_LOCK(&sched_mutex);
1171 sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
1173 threadPaused(cap->rCurrentTSO);
1174 cap->rCurrentTSO->link = suspended_ccalling_threads;
1175 suspended_ccalling_threads = cap->rCurrentTSO;
1177 /* Use the thread ID as the token; it should be unique */
1178 tok = cap->rCurrentTSO->id;
1181 cap->link = free_capabilities;
1182 free_capabilities = cap;
1183 n_free_capabilities++;
1186 RELEASE_LOCK(&sched_mutex);
1191 resumeThread( StgInt tok )
1193 StgTSO *tso, **prev;
1196 ACQUIRE_LOCK(&sched_mutex);
1198 prev = &suspended_ccalling_threads;
1199 for (tso = suspended_ccalling_threads;
1200 tso != END_TSO_QUEUE;
1201 prev = &tso->link, tso = tso->link) {
1202 if (tso->id == (StgThreadID)tok) {
1207 if (tso == END_TSO_QUEUE) {
1208 barf("resumeThread: thread not found");
1212 while (free_capabilities == NULL) {
1213 IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1214 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1215 IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1217 cap = free_capabilities;
1218 free_capabilities = cap->link;
1219 n_free_capabilities--;
1221 cap = &MainRegTable;
1224 cap->rCurrentTSO = tso;
1226 RELEASE_LOCK(&sched_mutex);
1231 /* ---------------------------------------------------------------------------
1233 * ------------------------------------------------------------------------ */
1234 static void unblockThread(StgTSO *tso);
1236 /* ---------------------------------------------------------------------------
1237 * Comparing Thread ids.
1239 * This is used from STG land in the implementation of the
1240 * instances of Eq/Ord for ThreadIds.
1241 * ------------------------------------------------------------------------ */
1243 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1245 StgThreadID id1 = tso1->id;
1246 StgThreadID id2 = tso2->id;
1248 if (id1 < id2) return (-1);
1249 if (id1 > id2) return 1;
1253 /* ---------------------------------------------------------------------------
1254 Create a new thread.
1256 The new thread starts with the given stack size. Before the
1257 scheduler can run, however, this thread needs to have a closure
1258 (and possibly some arguments) pushed on its stack. See
1259 pushClosure() in Schedule.h.
1261 createGenThread() and createIOThread() (in SchedAPI.h) are
1262 convenient packaged versions of this function.
1264 currently pri (priority) is only used in a GRAN setup -- HWL
1265 ------------------------------------------------------------------------ */
1266 //@cindex createThread
1268 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1270 createThread(nat stack_size, StgInt pri)
1272 return createThread_(stack_size, rtsFalse, pri);
1276 createThread_(nat size, rtsBool have_lock, StgInt pri)
1280 createThread(nat stack_size)
1282 return createThread_(stack_size, rtsFalse);
1286 createThread_(nat size, rtsBool have_lock)
1293 /* First check whether we should create a thread at all */
1295 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1296 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1298 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1299 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1300 return END_TSO_QUEUE;
1306 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1309 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1311 /* catch ridiculously small stack sizes */
1312 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1313 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1316 stack_size = size - TSO_STRUCT_SIZEW;
1318 tso = (StgTSO *)allocate(size);
1319 TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1321 SET_HDR(tso, &TSO_info, CCS_SYSTEM);
1323 SET_GRAN_HDR(tso, ThisPE);
1325 tso->what_next = ThreadEnterGHC;
1327 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1328 * protect the increment operation on next_thread_id.
1329 * In future, we could use an atomic increment instead.
1331 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1332 tso->id = next_thread_id++;
1333 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1335 tso->why_blocked = NotBlocked;
1336 tso->blocked_exceptions = NULL;
1338 tso->splim = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1339 tso->stack_size = stack_size;
1340 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1342 tso->sp = (P_)&(tso->stack) + stack_size;
1345 tso->prof.CCCS = CCS_MAIN;
1348 /* put a stop frame on the stack */
1349 tso->sp -= sizeofW(StgStopFrame);
1350 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1351 tso->su = (StgUpdateFrame*)tso->sp;
1355 tso->link = END_TSO_QUEUE;
1356 /* uses more flexible routine in GranSim */
1357 insertThread(tso, CurrentProc);
1359 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1364 #if defined(GRAN) || defined(PAR)
1365 DumpGranEvent(GR_START,tso);
1368 /* Link the new thread on the global thread list.
1370 tso->global_link = all_threads;
1374 tso->gran.pri = pri;
1376 tso->gran.magic = TSO_MAGIC; // debugging only
1378 tso->gran.sparkname = 0;
1379 tso->gran.startedat = CURRENT_TIME;
1380 tso->gran.exported = 0;
1381 tso->gran.basicblocks = 0;
1382 tso->gran.allocs = 0;
1383 tso->gran.exectime = 0;
1384 tso->gran.fetchtime = 0;
1385 tso->gran.fetchcount = 0;
1386 tso->gran.blocktime = 0;
1387 tso->gran.blockcount = 0;
1388 tso->gran.blockedat = 0;
1389 tso->gran.globalsparks = 0;
1390 tso->gran.localsparks = 0;
1391 if (RtsFlags.GranFlags.Light)
1392 tso->gran.clock = Now; /* local clock */
1394 tso->gran.clock = 0;
1396 IF_DEBUG(gran,printTSO(tso));
1399 tso->par.magic = TSO_MAGIC; // debugging only
1401 tso->par.sparkname = 0;
1402 tso->par.startedat = CURRENT_TIME;
1403 tso->par.exported = 0;
1404 tso->par.basicblocks = 0;
1405 tso->par.allocs = 0;
1406 tso->par.exectime = 0;
1407 tso->par.fetchtime = 0;
1408 tso->par.fetchcount = 0;
1409 tso->par.blocktime = 0;
1410 tso->par.blockcount = 0;
1411 tso->par.blockedat = 0;
1412 tso->par.globalsparks = 0;
1413 tso->par.localsparks = 0;
1417 globalGranStats.tot_threads_created++;
1418 globalGranStats.threads_created_on_PE[CurrentProc]++;
1419 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1420 globalGranStats.tot_sq_probes++;
1425 belch("==__ schedule: Created TSO %d (%p);",
1426 CurrentProc, tso, tso->id));
1428 IF_PAR_DEBUG(verbose,
1429 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1430 tso->id, tso, advisory_thread_count));
1432 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1433 tso->id, tso->stack_size));
1439 Turn a spark into a thread.
1440 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1443 //@cindex activateSpark
1445 activateSpark (rtsSpark spark)
1449 ASSERT(spark != (rtsSpark)NULL);
1450 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1451 if (tso!=END_TSO_QUEUE) {
1452 pushClosure(tso,spark);
1453 PUSH_ON_RUN_QUEUE(tso);
1454 advisory_thread_count++;
1456 if (RtsFlags.ParFlags.ParStats.Full) {
1457 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1458 IF_PAR_DEBUG(verbose,
1459 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1460 (StgClosure *)spark, info_type((StgClosure *)spark)));
1463 barf("activateSpark: Cannot create TSO");
1465 // ToDo: fwd info on local/global spark to thread -- HWL
1466 // tso->gran.exported = spark->exported;
1467 // tso->gran.locked = !spark->global;
1468 // tso->gran.sparkname = spark->name;
1474 /* ---------------------------------------------------------------------------
1477 * scheduleThread puts a thread on the head of the runnable queue.
1478 * This will usually be done immediately after a thread is created.
1479 * The caller of scheduleThread must create the thread using e.g.
1480 * createThread and push an appropriate closure
1481 * on this thread's stack before the scheduler is invoked.
1482 * ------------------------------------------------------------------------ */
1485 scheduleThread(StgTSO *tso)
1487 if (tso==END_TSO_QUEUE){
1492 ACQUIRE_LOCK(&sched_mutex);
1494 /* Put the new thread on the head of the runnable queue. The caller
1495 * better push an appropriate closure on this thread's stack
1496 * beforehand. In the SMP case, the thread may start running as
1497 * soon as we release the scheduler lock below.
1499 PUSH_ON_RUN_QUEUE(tso);
1503 IF_DEBUG(scheduler,printTSO(tso));
1505 RELEASE_LOCK(&sched_mutex);
1508 /* ---------------------------------------------------------------------------
1511 * Start up Posix threads to run each of the scheduler tasks.
1512 * I believe the task ids are not needed in the system as defined.
1514 * ------------------------------------------------------------------------ */
1516 #if defined(PAR) || defined(SMP)
1518 taskStart( void *arg STG_UNUSED )
1520 rts_evalNothing(NULL);
1524 /* ---------------------------------------------------------------------------
1527 * Initialise the scheduler. This resets all the queues - if the
1528 * queues contained any threads, they'll be garbage collected at the
1531 * This now calls startTasks(), so should only be called once! KH @ 25/10/99
1532 * ------------------------------------------------------------------------ */
1536 term_handler(int sig STG_UNUSED)
1539 ACQUIRE_LOCK(&term_mutex);
1541 RELEASE_LOCK(&term_mutex);
1546 //@cindex initScheduler
1553 for (i=0; i<=MAX_PROC; i++) {
1554 run_queue_hds[i] = END_TSO_QUEUE;
1555 run_queue_tls[i] = END_TSO_QUEUE;
1556 blocked_queue_hds[i] = END_TSO_QUEUE;
1557 blocked_queue_tls[i] = END_TSO_QUEUE;
1558 ccalling_threadss[i] = END_TSO_QUEUE;
1561 run_queue_hd = END_TSO_QUEUE;
1562 run_queue_tl = END_TSO_QUEUE;
1563 blocked_queue_hd = END_TSO_QUEUE;
1564 blocked_queue_tl = END_TSO_QUEUE;
1567 suspended_ccalling_threads = END_TSO_QUEUE;
1569 main_threads = NULL;
1570 all_threads = END_TSO_QUEUE;
1576 ecafList = END_ECAF_LIST;
1580 /* Install the SIGHUP handler */
1583 struct sigaction action,oact;
1585 action.sa_handler = term_handler;
1586 sigemptyset(&action.sa_mask);
1587 action.sa_flags = 0;
1588 if (sigaction(SIGTERM, &action, &oact) != 0) {
1589 barf("can't install TERM handler");
1595 /* Allocate N Capabilities */
1598 Capability *cap, *prev;
1601 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1602 cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1606 free_capabilities = cap;
1607 n_free_capabilities = RtsFlags.ParFlags.nNodes;
1609 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1610 n_free_capabilities););
1613 #if defined(SMP) || defined(PAR)
1626 /* make some space for saving all the thread ids */
1627 task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1628 "initScheduler:task_ids");
1630 /* and create all the threads */
1631 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1632 r = pthread_create(&tid,NULL,taskStart,NULL);
1634 barf("startTasks: Can't create new Posix thread");
1636 task_ids[i].id = tid;
1637 task_ids[i].mut_time = 0.0;
1638 task_ids[i].mut_etime = 0.0;
1639 task_ids[i].gc_time = 0.0;
1640 task_ids[i].gc_etime = 0.0;
1641 task_ids[i].elapsedtimestart = elapsedtime();
1642 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1648 exitScheduler( void )
1653 /* Don't want to use pthread_cancel, since we'd have to install
1654 * these silly exception handlers (pthread_cleanup_{push,pop}) around
1658 /* Cancel all our tasks */
1659 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1660 pthread_cancel(task_ids[i].id);
1663 /* Wait for all the tasks to terminate */
1664 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1665 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n",
1667 pthread_join(task_ids[i].id, NULL);
1671 /* Send 'em all a SIGHUP. That should shut 'em up.
1673 await_death = RtsFlags.ParFlags.nNodes;
1674 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1675 pthread_kill(task_ids[i].id,SIGTERM);
1677 while (await_death > 0) {
1683 /* -----------------------------------------------------------------------------
1684 Managing the per-task allocation areas.
1686 Each capability comes with an allocation area. These are
1687 fixed-length block lists into which allocation can be done.
1689 ToDo: no support for two-space collection at the moment???
1690 -------------------------------------------------------------------------- */
1692 /* -----------------------------------------------------------------------------
1693 * waitThread is the external interface for running a new computation
1694 * and waiting for the result.
1696 * In the non-SMP case, we create a new main thread, push it on the
1697 * main-thread stack, and invoke the scheduler to run it. The
1698 * scheduler will return when the top main thread on the stack has
1699 * completed or died, and fill in the necessary fields of the
1700 * main_thread structure.
1702 * In the SMP case, we create a main thread as before, but we then
1703 * create a new condition variable and sleep on it. When our new
1704 * main thread has completed, we'll be woken up and the status/result
1705 * will be in the main_thread struct.
1706 * -------------------------------------------------------------------------- */
1709 howManyThreadsAvail ( void )
1713 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1715 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1721 finishAllThreads ( void )
1724 while (run_queue_hd != END_TSO_QUEUE) {
1725 waitThread ( run_queue_hd, NULL );
1727 while (blocked_queue_hd != END_TSO_QUEUE) {
1728 waitThread ( blocked_queue_hd, NULL );
1731 (blocked_queue_hd != END_TSO_QUEUE ||
1732 run_queue_hd != END_TSO_QUEUE);
1736 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1739 SchedulerStatus stat;
1741 ACQUIRE_LOCK(&sched_mutex);
1743 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1749 pthread_cond_init(&m->wakeup, NULL);
1752 m->link = main_threads;
1755 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n",
1760 pthread_cond_wait(&m->wakeup, &sched_mutex);
1761 } while (m->stat == NoStatus);
1763 /* GranSim specific init */
1764 CurrentTSO = m->tso; // the TSO to run
1765 procStatus[MainProc] = Busy; // status of main PE
1766 CurrentProc = MainProc; // PE to run it on
1771 ASSERT(m->stat != NoStatus);
1777 pthread_cond_destroy(&m->wakeup);
1780 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n",
1784 RELEASE_LOCK(&sched_mutex);
1789 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1790 //@subsection Run queue code
1794 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1795 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1796 implicit global variable that has to be correct when calling these
1800 /* Put the new thread on the head of the runnable queue.
1801 * The caller of createThread better push an appropriate closure
1802 * on this thread's stack before the scheduler is invoked.
1804 static /* inline */ void
1805 add_to_run_queue(tso)
1808 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1809 tso->link = run_queue_hd;
1811 if (run_queue_tl == END_TSO_QUEUE) {
1816 /* Put the new thread at the end of the runnable queue. */
1817 static /* inline */ void
1818 push_on_run_queue(tso)
1821 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1822 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1823 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1824 if (run_queue_hd == END_TSO_QUEUE) {
1827 run_queue_tl->link = tso;
1833 Should be inlined because it's used very often in schedule. The tso
1834 argument is actually only needed in GranSim, where we want to have the
1835 possibility to schedule *any* TSO on the run queue, irrespective of the
1836 actual ordering. Therefore, if tso is not the nil TSO then we traverse
1837 the run queue and dequeue the tso, adjusting the links in the queue.
1839 //@cindex take_off_run_queue
1840 static /* inline */ StgTSO*
1841 take_off_run_queue(StgTSO *tso) {
1845 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1847 if tso is specified, unlink that tso from the run_queue (doesn't have
1848 to be at the beginning of the queue); GranSim only
1850 if (tso!=END_TSO_QUEUE) {
1851 /* find tso in queue */
1852 for (t=run_queue_hd, prev=END_TSO_QUEUE;
1853 t!=END_TSO_QUEUE && t!=tso;
1857 /* now actually dequeue the tso */
1858 if (prev!=END_TSO_QUEUE) {
1859 ASSERT(run_queue_hd!=t);
1860 prev->link = t->link;
1862 /* t is at beginning of thread queue */
1863 ASSERT(run_queue_hd==t);
1864 run_queue_hd = t->link;
1866 /* t is at end of thread queue */
1867 if (t->link==END_TSO_QUEUE) {
1868 ASSERT(t==run_queue_tl);
1869 run_queue_tl = prev;
1871 ASSERT(run_queue_tl!=t);
1873 t->link = END_TSO_QUEUE;
1875 /* take tso from the beginning of the queue; std concurrent code */
1877 if (t != END_TSO_QUEUE) {
1878 run_queue_hd = t->link;
1879 t->link = END_TSO_QUEUE;
1880 if (run_queue_hd == END_TSO_QUEUE) {
1881 run_queue_tl = END_TSO_QUEUE;
1890 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1891 //@subsection Garbage Collextion Routines
1893 /* ---------------------------------------------------------------------------
1894 Where are the roots that we know about?
1896 - all the threads on the runnable queue
1897 - all the threads on the blocked queue
1898 - all the thread currently executing a _ccall_GC
1899 - all the "main threads"
1901 ------------------------------------------------------------------------ */
1903 /* This has to be protected either by the scheduler monitor, or by the
1904 garbage collection monitor (probably the latter).
1908 static void GetRoots(void)
1915 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1916 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1917 run_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1918 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1919 run_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1921 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1922 blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1923 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1924 blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1925 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1926 ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1933 if (run_queue_hd != END_TSO_QUEUE) {
1934 ASSERT(run_queue_tl != END_TSO_QUEUE);
1935 run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1936 run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1939 if (blocked_queue_hd != END_TSO_QUEUE) {
1940 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1941 blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1942 blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1946 for (m = main_threads; m != NULL; m = m->link) {
1947 m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1949 if (suspended_ccalling_threads != END_TSO_QUEUE)
1950 suspended_ccalling_threads =
1951 (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1953 #if defined(SMP) || defined(PAR) || defined(GRAN)
1958 /* -----------------------------------------------------------------------------
1961 This is the interface to the garbage collector from Haskell land.
1962 We provide this so that external C code can allocate and garbage
1963 collect when called from Haskell via _ccall_GC.
1965 It might be useful to provide an interface whereby the programmer
1966 can specify more roots (ToDo).
1968 This needs to be protected by the GC condition variable above. KH.
1969 -------------------------------------------------------------------------- */
1971 void (*extra_roots)(void);
1976 GarbageCollect(GetRoots,rtsFalse);
1980 performMajorGC(void)
1982 GarbageCollect(GetRoots,rtsTrue);
1988 GetRoots(); /* the scheduler's roots */
1989 extra_roots(); /* the user's roots */
1993 performGCWithRoots(void (*get_roots)(void))
1995 extra_roots = get_roots;
1997 GarbageCollect(AllRoots,rtsFalse);
2000 /* -----------------------------------------------------------------------------
2003 If the thread has reached its maximum stack size, then raise the
2004 StackOverflow exception in the offending thread. Otherwise
2005 relocate the TSO into a larger chunk of memory and adjust its stack
2007 -------------------------------------------------------------------------- */
2010 threadStackOverflow(StgTSO *tso)
2012 nat new_stack_size, new_tso_size, diff, stack_words;
2016 IF_DEBUG(sanity,checkTSO(tso));
2017 if (tso->stack_size >= tso->max_stack_size) {
2020 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2021 tso->id, tso, tso->stack_size, tso->max_stack_size);
2022 /* If we're debugging, just print out the top of the stack */
2023 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2027 fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
2030 /* Send this thread the StackOverflow exception */
2031 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2036 /* Try to double the current stack size. If that takes us over the
2037 * maximum stack size for this thread, then use the maximum instead.
2038 * Finally round up so the TSO ends up as a whole number of blocks.
2040 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2041 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2042 TSO_STRUCT_SIZE)/sizeof(W_);
2043 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2044 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2046 IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2048 dest = (StgTSO *)allocate(new_tso_size);
2049 TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2051 /* copy the TSO block and the old stack into the new area */
2052 memcpy(dest,tso,TSO_STRUCT_SIZE);
2053 stack_words = tso->stack + tso->stack_size - tso->sp;
2054 new_sp = (P_)dest + new_tso_size - stack_words;
2055 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2057 /* relocate the stack pointers... */
2058 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2059 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2061 dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
2062 dest->stack_size = new_stack_size;
2064 /* and relocate the update frame list */
2065 relocate_TSO(tso, dest);
2067 /* Mark the old TSO as relocated. We have to check for relocated
2068 * TSOs in the garbage collector and any primops that deal with TSOs.
2070 * It's important to set the sp and su values to just beyond the end
2071 * of the stack, so we don't attempt to scavenge any part of the
2074 tso->what_next = ThreadRelocated;
2076 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2077 tso->su = (StgUpdateFrame *)tso->sp;
2078 tso->why_blocked = NotBlocked;
2079 dest->mut_link = NULL;
2081 IF_PAR_DEBUG(verbose,
2082 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2083 tso->id, tso, tso->stack_size);
2084 /* If we're debugging, just print out the top of the stack */
2085 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2088 IF_DEBUG(sanity,checkTSO(tso));
2090 IF_DEBUG(scheduler,printTSO(dest));
2096 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2097 //@subsection Blocking Queue Routines
2099 /* ---------------------------------------------------------------------------
2100 Wake up a queue that was blocked on some resource.
2101 ------------------------------------------------------------------------ */
2103 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
2107 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2112 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2114 /* write RESUME events to log file and
2115 update blocked and fetch time (depending on type of the orig closure) */
2116 if (RtsFlags.ParFlags.ParStats.Full) {
2117 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2118 GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2119 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2121 switch (get_itbl(node)->type) {
2123 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2128 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2131 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2138 static StgBlockingQueueElement *
2139 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2142 PEs node_loc, tso_loc;
2144 node_loc = where_is(node); // should be lifted out of loop
2145 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2146 tso_loc = where_is((StgClosure *)tso);
2147 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2148 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2149 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2150 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2151 // insertThread(tso, node_loc);
2152 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2154 tso, node, (rtsSpark*)NULL);
2155 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2158 } else { // TSO is remote (actually should be FMBQ)
2159 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2160 RtsFlags.GranFlags.Costs.gunblocktime +
2161 RtsFlags.GranFlags.Costs.latency;
2162 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2164 tso, node, (rtsSpark*)NULL);
2165 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2168 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2170 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2171 (node_loc==tso_loc ? "Local" : "Global"),
2172 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2173 tso->block_info.closure = NULL;
2174 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2178 static StgBlockingQueueElement *
2179 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2181 StgBlockingQueueElement *next;
2183 switch (get_itbl(bqe)->type) {
2185 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2186 /* if it's a TSO just push it onto the run_queue */
2188 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2189 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2191 unblockCount(bqe, node);
2192 /* reset blocking status after dumping event */
2193 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2197 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2199 bqe->link = PendingFetches;
2200 PendingFetches = bqe;
2204 /* can ignore this case in a non-debugging setup;
2205 see comments on RBHSave closures above */
2207 /* check that the closure is an RBHSave closure */
2208 ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
2209 get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
2210 get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
2214 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2215 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2219 // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2223 #else /* !GRAN && !PAR */
2225 unblockOneLocked(StgTSO *tso)
2229 ASSERT(get_itbl(tso)->type == TSO);
2230 ASSERT(tso->why_blocked != NotBlocked);
2231 tso->why_blocked = NotBlocked;
2233 PUSH_ON_RUN_QUEUE(tso);
2235 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2240 #if defined(GRAN) || defined(PAR)
2241 inline StgBlockingQueueElement *
2242 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2244 ACQUIRE_LOCK(&sched_mutex);
2245 bqe = unblockOneLocked(bqe, node);
2246 RELEASE_LOCK(&sched_mutex);
2251 unblockOne(StgTSO *tso)
2253 ACQUIRE_LOCK(&sched_mutex);
2254 tso = unblockOneLocked(tso);
2255 RELEASE_LOCK(&sched_mutex);
2262 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2264 StgBlockingQueueElement *bqe;
2269 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2270 node, CurrentProc, CurrentTime[CurrentProc],
2271 CurrentTSO->id, CurrentTSO));
2273 node_loc = where_is(node);
2275 ASSERT(get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2276 get_itbl(q)->type == CONSTR); // closure (type constructor)
2277 ASSERT(is_unique(node));
2279 /* FAKE FETCH: magically copy the node to the tso's proc;
2280 no Fetch necessary because in reality the node should not have been
2281 moved to the other PE in the first place
2283 if (CurrentProc!=node_loc) {
2285 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2286 node, node_loc, CurrentProc, CurrentTSO->id,
2287 // CurrentTSO, where_is(CurrentTSO),
2288 node->header.gran.procs));
2289 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2291 belch("## new bitmask of node %p is %#x",
2292 node, node->header.gran.procs));
2293 if (RtsFlags.GranFlags.GranSimStats.Global) {
2294 globalGranStats.tot_fake_fetches++;
2299 // ToDo: check: ASSERT(CurrentProc==node_loc);
2300 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2303 bqe points to the current element in the queue
2304 next points to the next element in the queue
2306 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2307 //tso_loc = where_is(tso);
2309 bqe = unblockOneLocked(bqe, node);
2312 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2313 the closure to make room for the anchor of the BQ */
2314 if (bqe!=END_BQ_QUEUE) {
2315 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2317 ASSERT((info_ptr==&RBH_Save_0_info) ||
2318 (info_ptr==&RBH_Save_1_info) ||
2319 (info_ptr==&RBH_Save_2_info));
2321 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2322 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2323 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2326 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2327 node, info_type(node)));
2330 /* statistics gathering */
2331 if (RtsFlags.GranFlags.GranSimStats.Global) {
2332 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2333 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2334 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2335 globalGranStats.tot_awbq++; // total no. of bqs awakened
2338 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2339 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2343 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2345 StgBlockingQueueElement *bqe, *next;
2347 ACQUIRE_LOCK(&sched_mutex);
2349 IF_PAR_DEBUG(verbose,
2350 belch("## AwBQ for node %p on [%x]: ",
2353 ASSERT(get_itbl(q)->type == TSO ||
2354 get_itbl(q)->type == BLOCKED_FETCH ||
2355 get_itbl(q)->type == CONSTR);
2358 while (get_itbl(bqe)->type==TSO ||
2359 get_itbl(bqe)->type==BLOCKED_FETCH) {
2360 bqe = unblockOneLocked(bqe, node);
2362 RELEASE_LOCK(&sched_mutex);
2365 #else /* !GRAN && !PAR */
2367 awakenBlockedQueue(StgTSO *tso)
2369 ACQUIRE_LOCK(&sched_mutex);
2370 while (tso != END_TSO_QUEUE) {
2371 tso = unblockOneLocked(tso);
2373 RELEASE_LOCK(&sched_mutex);
2377 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2378 //@subsection Exception Handling Routines
2380 /* ---------------------------------------------------------------------------
2382 - usually called inside a signal handler so it mustn't do anything fancy.
2383 ------------------------------------------------------------------------ */
2386 interruptStgRts(void)
2392 /* -----------------------------------------------------------------------------
2395 This is for use when we raise an exception in another thread, which
2397 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2398 -------------------------------------------------------------------------- */
2400 #if defined(GRAN) || defined(PAR)
2402 NB: only the type of the blocking queue is different in GranSim and GUM
2403 the operations on the queue-elements are the same
2404 long live polymorphism!
2407 unblockThread(StgTSO *tso)
2409 StgBlockingQueueElement *t, **last;
2411 ACQUIRE_LOCK(&sched_mutex);
2412 switch (tso->why_blocked) {
2415 return; /* not blocked */
2418 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2420 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2421 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2423 last = (StgBlockingQueueElement **)&mvar->head;
2424 for (t = (StgBlockingQueueElement *)mvar->head;
2426 last = &t->link, last_tso = t, t = t->link) {
2427 if (t == (StgBlockingQueueElement *)tso) {
2428 *last = (StgBlockingQueueElement *)tso->link;
2429 if (mvar->tail == tso) {
2430 mvar->tail = (StgTSO *)last_tso;
2435 barf("unblockThread (MVAR): TSO not found");
2438 case BlockedOnBlackHole:
2439 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2441 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2443 last = &bq->blocking_queue;
2444 for (t = bq->blocking_queue;
2446 last = &t->link, t = t->link) {
2447 if (t == (StgBlockingQueueElement *)tso) {
2448 *last = (StgBlockingQueueElement *)tso->link;
2452 barf("unblockThread (BLACKHOLE): TSO not found");
2455 case BlockedOnException:
2457 StgTSO *target = tso->block_info.tso;
2459 ASSERT(get_itbl(target)->type == TSO);
2460 ASSERT(target->blocked_exceptions != NULL);
2462 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2463 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2465 last = &t->link, t = t->link) {
2466 ASSERT(get_itbl(t)->type == TSO);
2467 if (t == (StgBlockingQueueElement *)tso) {
2468 *last = (StgBlockingQueueElement *)tso->link;
2472 barf("unblockThread (Exception): TSO not found");
2475 case BlockedOnDelay:
2477 case BlockedOnWrite:
2479 StgBlockingQueueElement *prev = NULL;
2480 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2481 prev = t, t = t->link) {
2482 if (t == (StgBlockingQueueElement *)tso) {
2484 blocked_queue_hd = (StgTSO *)t->link;
2485 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2486 blocked_queue_tl = END_TSO_QUEUE;
2489 prev->link = t->link;
2490 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2491 blocked_queue_tl = (StgTSO *)prev;
2497 barf("unblockThread (I/O): TSO not found");
2501 barf("unblockThread");
2505 tso->link = END_TSO_QUEUE;
2506 tso->why_blocked = NotBlocked;
2507 tso->block_info.closure = NULL;
2508 PUSH_ON_RUN_QUEUE(tso);
2509 RELEASE_LOCK(&sched_mutex);
2513 unblockThread(StgTSO *tso)
2517 ACQUIRE_LOCK(&sched_mutex);
2518 switch (tso->why_blocked) {
2521 return; /* not blocked */
2524 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2526 StgTSO *last_tso = END_TSO_QUEUE;
2527 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2530 for (t = mvar->head; t != END_TSO_QUEUE;
2531 last = &t->link, last_tso = t, t = t->link) {
2534 if (mvar->tail == tso) {
2535 mvar->tail = last_tso;
2540 barf("unblockThread (MVAR): TSO not found");
2543 case BlockedOnBlackHole:
2544 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2546 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2548 last = &bq->blocking_queue;
2549 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2550 last = &t->link, t = t->link) {
2556 barf("unblockThread (BLACKHOLE): TSO not found");
2559 case BlockedOnException:
2561 StgTSO *target = tso->block_info.tso;
2563 ASSERT(get_itbl(target)->type == TSO);
2564 ASSERT(target->blocked_exceptions != NULL);
2566 last = &target->blocked_exceptions;
2567 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2568 last = &t->link, t = t->link) {
2569 ASSERT(get_itbl(t)->type == TSO);
2575 barf("unblockThread (Exception): TSO not found");
2578 case BlockedOnDelay:
2580 case BlockedOnWrite:
2582 StgTSO *prev = NULL;
2583 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2584 prev = t, t = t->link) {
2587 blocked_queue_hd = t->link;
2588 if (blocked_queue_tl == t) {
2589 blocked_queue_tl = END_TSO_QUEUE;
2592 prev->link = t->link;
2593 if (blocked_queue_tl == t) {
2594 blocked_queue_tl = prev;
2600 barf("unblockThread (I/O): TSO not found");
2604 barf("unblockThread");
2608 tso->link = END_TSO_QUEUE;
2609 tso->why_blocked = NotBlocked;
2610 tso->block_info.closure = NULL;
2611 PUSH_ON_RUN_QUEUE(tso);
2612 RELEASE_LOCK(&sched_mutex);
2616 /* -----------------------------------------------------------------------------
2619 * The following function implements the magic for raising an
2620 * asynchronous exception in an existing thread.
2622 * We first remove the thread from any queue on which it might be
2623 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2625 * We strip the stack down to the innermost CATCH_FRAME, building
2626 * thunks in the heap for all the active computations, so they can
2627 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2628 * an application of the handler to the exception, and push it on
2629 * the top of the stack.
2631 * How exactly do we save all the active computations? We create an
2632 * AP_UPD for every UpdateFrame on the stack. Entering one of these
2633 * AP_UPDs pushes everything from the corresponding update frame
2634 * upwards onto the stack. (Actually, it pushes everything up to the
2635 * next update frame plus a pointer to the next AP_UPD object.
2636 * Entering the next AP_UPD object pushes more onto the stack until we
2637 * reach the last AP_UPD object - at which point the stack should look
2638 * exactly as it did when we killed the TSO and we can continue
2639 * execution by entering the closure on top of the stack.
2641 * We can also kill a thread entirely - this happens if either (a) the
2642 * exception passed to raiseAsync is NULL, or (b) there's no
2643 * CATCH_FRAME on the stack. In either case, we strip the entire
2644 * stack and replace the thread with a zombie.
2646 * -------------------------------------------------------------------------- */
2649 deleteThread(StgTSO *tso)
2651 raiseAsync(tso,NULL);
2655 raiseAsync(StgTSO *tso, StgClosure *exception)
2657 StgUpdateFrame* su = tso->su;
2658 StgPtr sp = tso->sp;
2660 /* Thread already dead? */
2661 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2665 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2667 /* Remove it from any blocking queues */
2670 /* The stack freezing code assumes there's a closure pointer on
2671 * the top of the stack. This isn't always the case with compiled
2672 * code, so we have to push a dummy closure on the top which just
2673 * returns to the next return address on the stack.
2675 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2676 *(--sp) = (W_)&dummy_ret_closure;
2680 int words = ((P_)su - (P_)sp) - 1;
2684 /* If we find a CATCH_FRAME, and we've got an exception to raise,
2685 * then build PAP(handler,exception,realworld#), and leave it on
2686 * top of the stack ready to enter.
2688 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2689 StgCatchFrame *cf = (StgCatchFrame *)su;
2690 /* we've got an exception to raise, so let's pass it to the
2691 * handler in this frame.
2693 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2694 TICK_ALLOC_UPD_PAP(3,0);
2695 SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2698 ap->fun = cf->handler; /* :: Exception -> IO a */
2699 ap->payload[0] = (P_)exception;
2700 ap->payload[1] = ARG_TAG(0); /* realworld token */
2702 /* throw away the stack from Sp up to and including the
2705 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
2708 /* Restore the blocked/unblocked state for asynchronous exceptions
2709 * at the CATCH_FRAME.
2711 * If exceptions were unblocked at the catch, arrange that they
2712 * are unblocked again after executing the handler by pushing an
2713 * unblockAsyncExceptions_ret stack frame.
2715 if (!cf->exceptions_blocked) {
2716 *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2719 /* Ensure that async exceptions are blocked when running the handler.
2721 if (tso->blocked_exceptions == NULL) {
2722 tso->blocked_exceptions = END_TSO_QUEUE;
2725 /* Put the newly-built PAP on top of the stack, ready to execute
2726 * when the thread restarts.
2730 tso->what_next = ThreadEnterGHC;
2734 /* First build an AP_UPD consisting of the stack chunk above the
2735 * current update frame, with the top word on the stack as the
2738 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2743 ap->fun = (StgClosure *)sp[0];
2745 for(i=0; i < (nat)words; ++i) {
2746 ap->payload[i] = (P_)*sp++;
2749 switch (get_itbl(su)->type) {
2753 SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */);
2754 TICK_ALLOC_UP_THK(words+1,0);
2757 fprintf(stderr, "scheduler: Updating ");
2758 printPtr((P_)su->updatee);
2759 fprintf(stderr, " with ");
2760 printObj((StgClosure *)ap);
2763 /* Replace the updatee with an indirection - happily
2764 * this will also wake up any threads currently
2765 * waiting on the result.
2767 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
2769 sp += sizeofW(StgUpdateFrame) -1;
2770 sp[0] = (W_)ap; /* push onto stack */
2776 StgCatchFrame *cf = (StgCatchFrame *)su;
2779 /* We want a PAP, not an AP_UPD. Fortunately, the
2780 * layout's the same.
2782 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2783 TICK_ALLOC_UPD_PAP(words+1,0);
2785 /* now build o = FUN(catch,ap,handler) */
2786 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2787 TICK_ALLOC_FUN(2,0);
2788 SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2789 o->payload[0] = (StgClosure *)ap;
2790 o->payload[1] = cf->handler;
2793 fprintf(stderr, "scheduler: Built ");
2794 printObj((StgClosure *)o);
2797 /* pop the old handler and put o on the stack */
2799 sp += sizeofW(StgCatchFrame) - 1;
2806 StgSeqFrame *sf = (StgSeqFrame *)su;
2809 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2810 TICK_ALLOC_UPD_PAP(words+1,0);
2812 /* now build o = FUN(seq,ap) */
2813 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2814 TICK_ALLOC_SE_THK(1,0);
2815 SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2816 o->payload[0] = (StgClosure *)ap;
2819 fprintf(stderr, "scheduler: Built ");
2820 printObj((StgClosure *)o);
2823 /* pop the old handler and put o on the stack */
2825 sp += sizeofW(StgSeqFrame) - 1;
2831 /* We've stripped the entire stack, the thread is now dead. */
2832 sp += sizeofW(StgStopFrame) - 1;
2833 sp[0] = (W_)exception; /* save the exception */
2834 tso->what_next = ThreadKilled;
2835 tso->su = (StgUpdateFrame *)(sp+1);
2846 /* -----------------------------------------------------------------------------
2847 resurrectThreads is called after garbage collection on the list of
2848 threads found to be garbage. Each of these threads will be woken
2849 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2850 on an MVar, or NonTermination if the thread was blocked on a Black
2852 -------------------------------------------------------------------------- */
2855 resurrectThreads( StgTSO *threads )
2859 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2860 next = tso->global_link;
2861 tso->global_link = all_threads;
2863 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2865 switch (tso->why_blocked) {
2867 case BlockedOnException:
2868 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2870 case BlockedOnBlackHole:
2871 raiseAsync(tso,(StgClosure *)NonTermination_closure);
2874 /* This might happen if the thread was blocked on a black hole
2875 * belonging to a thread that we've just woken up (raiseAsync
2876 * can wake up threads, remember...).
2880 barf("resurrectThreads: thread blocked in a strange way");
2885 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2886 //@subsection Debugging Routines
2888 /* -----------------------------------------------------------------------------
2889 Debugging: why is a thread blocked
2890 -------------------------------------------------------------------------- */
2895 printThreadBlockage(StgTSO *tso)
2897 switch (tso->why_blocked) {
2899 fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2901 case BlockedOnWrite:
2902 fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2904 case BlockedOnDelay:
2905 #if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS)
2906 fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2908 fprintf(stderr,"blocked on delay of %d ms",
2909 tso->block_info.target - getourtimeofday());
2913 fprintf(stderr,"blocked on an MVar");
2915 case BlockedOnException:
2916 fprintf(stderr,"blocked on delivering an exception to thread %d",
2917 tso->block_info.tso->id);
2919 case BlockedOnBlackHole:
2920 fprintf(stderr,"blocked on a black hole");
2923 fprintf(stderr,"not blocked");
2927 fprintf(stderr,"blocked on global address; local FM_BQ is %p (%s)",
2928 tso->block_info.closure, info_type(tso->block_info.closure));
2930 case BlockedOnGA_NoSend:
2931 fprintf(stderr,"blocked on global address (no send); local FM_BQ is %p (%s)",
2932 tso->block_info.closure, info_type(tso->block_info.closure));
2936 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
2937 tso->why_blocked, tso->id, tso);
2942 printThreadStatus(StgTSO *tso)
2944 switch (tso->what_next) {
2946 fprintf(stderr,"has been killed");
2948 case ThreadComplete:
2949 fprintf(stderr,"has completed");
2952 printThreadBlockage(tso);
2957 printAllThreads(void)
2961 sched_belch("all threads:");
2962 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2963 fprintf(stderr, "\tthread %d is ", t->id);
2964 printThreadStatus(t);
2965 fprintf(stderr,"\n");
2970 Print a whole blocking queue attached to node (debugging only).
2975 print_bq (StgClosure *node)
2977 StgBlockingQueueElement *bqe;
2981 fprintf(stderr,"## BQ of closure %p (%s): ",
2982 node, info_type(node));
2984 /* should cover all closures that may have a blocking queue */
2985 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2986 get_itbl(node)->type == FETCH_ME_BQ ||
2987 get_itbl(node)->type == RBH);
2989 ASSERT(node!=(StgClosure*)NULL); // sanity check
2991 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2993 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2994 !end; // iterate until bqe points to a CONSTR
2995 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2996 ASSERT(bqe != END_BQ_QUEUE); // sanity check
2997 ASSERT(bqe != (StgTSO*)NULL); // sanity check
2998 /* types of closures that may appear in a blocking queue */
2999 ASSERT(get_itbl(bqe)->type == TSO ||
3000 get_itbl(bqe)->type == BLOCKED_FETCH ||
3001 get_itbl(bqe)->type == CONSTR);
3002 /* only BQs of an RBH end with an RBH_Save closure */
3003 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3005 switch (get_itbl(bqe)->type) {
3007 fprintf(stderr," TSO %d (%x),",
3008 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3011 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3012 ((StgBlockedFetch *)bqe)->node,
3013 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3014 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3015 ((StgBlockedFetch *)bqe)->ga.weight);
3018 fprintf(stderr," %s (IP %p),",
3019 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3020 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3021 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3022 "RBH_Save_?"), get_itbl(bqe));
3025 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3026 info_type(bqe), node, info_type(node));
3030 fputc('\n', stderr);
3032 # elif defined(GRAN)
3034 print_bq (StgClosure *node)
3036 StgBlockingQueueElement *bqe;
3037 PEs node_loc, tso_loc;
3040 /* should cover all closures that may have a blocking queue */
3041 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3042 get_itbl(node)->type == FETCH_ME_BQ ||
3043 get_itbl(node)->type == RBH);
3045 ASSERT(node!=(StgClosure*)NULL); // sanity check
3046 node_loc = where_is(node);
3048 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3049 node, info_type(node), node_loc);
3052 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3054 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3055 !end; // iterate until bqe points to a CONSTR
3056 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3057 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3058 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3059 /* types of closures that may appear in a blocking queue */
3060 ASSERT(get_itbl(bqe)->type == TSO ||
3061 get_itbl(bqe)->type == CONSTR);
3062 /* only BQs of an RBH end with an RBH_Save closure */
3063 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3065 tso_loc = where_is((StgClosure *)bqe);
3066 switch (get_itbl(bqe)->type) {
3068 fprintf(stderr," TSO %d (%p) on [PE %d],",
3069 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3072 fprintf(stderr," %s (IP %p),",
3073 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3074 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3075 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3076 "RBH_Save_?"), get_itbl(bqe));
3079 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3080 info_type((StgClosure *)bqe), node, info_type(node));
3084 fputc('\n', stderr);
3088 Nice and easy: only TSOs on the blocking queue
3091 print_bq (StgClosure *node)
3095 ASSERT(node!=(StgClosure*)NULL); // sanity check
3096 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3097 tso != END_TSO_QUEUE;
3099 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3100 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3101 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3103 fputc('\n', stderr);
3114 for (i=0, tso=run_queue_hd;
3115 tso != END_TSO_QUEUE;
3124 sched_belch(char *s, ...)
3129 fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3131 fprintf(stderr, "scheduler: ");
3133 vfprintf(stderr, s, ap);
3134 fprintf(stderr, "\n");
3140 //@node Index, , Debugging Routines, Main scheduling code
3144 //* MainRegTable:: @cindex\s-+MainRegTable
3145 //* StgMainThread:: @cindex\s-+StgMainThread
3146 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3147 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3148 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3149 //* context_switch:: @cindex\s-+context_switch
3150 //* createThread:: @cindex\s-+createThread
3151 //* free_capabilities:: @cindex\s-+free_capabilities
3152 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3153 //* initScheduler:: @cindex\s-+initScheduler
3154 //* interrupted:: @cindex\s-+interrupted
3155 //* n_free_capabilities:: @cindex\s-+n_free_capabilities
3156 //* next_thread_id:: @cindex\s-+next_thread_id
3157 //* print_bq:: @cindex\s-+print_bq
3158 //* run_queue_hd:: @cindex\s-+run_queue_hd
3159 //* run_queue_tl:: @cindex\s-+run_queue_tl
3160 //* sched_mutex:: @cindex\s-+sched_mutex
3161 //* schedule:: @cindex\s-+schedule
3162 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3163 //* task_ids:: @cindex\s-+task_ids
3164 //* term_mutex:: @cindex\s-+term_mutex
3165 //* thread_ready_cond:: @cindex\s-+thread_ready_cond