1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.69 2000/04/26 09:44:28 simonmar Exp $
4 * (c) The GHC Team, 1998-2000
8 * The main scheduling code in GranSim is quite different from that in std
9 * (concurrent) Haskell: while concurrent Haskell just iterates over the
10 * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11 * over the events in the global event queue. -- HWL
12 * --------------------------------------------------------------------------*/
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
17 /* Version with scheduler monitor support for SMPs.
19 This design provides a high-level API to create and schedule threads etc.
20 as documented in the SMP design document.
22 It uses a monitor design controlled by a single mutex to exercise control
23 over accesses to shared data structures, and builds on the Posix threads
26 The majority of state is shared. In order to keep essential per-task state,
27 there is a Capability structure, which contains all the information
28 needed to run a thread: its STG registers, a pointer to its TSO, a
29 nursery etc. During STG execution, a pointer to the capability is
30 kept in a register (BaseReg).
32 In a non-SMP build, there is one global capability, namely MainRegTable.
39 //* Variables and Data structures::
40 //* Main scheduling loop::
41 //* Suspend and Resume::
43 //* Garbage Collextion Routines::
44 //* Blocking Queue Routines::
45 //* Exception Handling Routines::
46 //* Debugging Routines::
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
59 #include "StgStartup.h"
63 #include "StgMiscClosures.h"
65 #include "Evaluator.h"
66 #include "Exception.h"
74 #if defined(GRAN) || defined(PAR)
75 # include "GranSimRts.h"
77 # include "ParallelRts.h"
78 # include "Parallel.h"
79 # include "ParallelDebug.h"
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
92 * These are the threads which clients have requested that we run.
94 * In an SMP build, we might have several concurrent clients all
95 * waiting for results, and each one will wait on a condition variable
96 * until the result is available.
98 * In non-SMP, clients are strictly nested: the first client calls
99 * into the RTS, which might call out again to C with a _ccall_GC, and
100 * eventually re-enter the RTS.
102 * Main threads information is kept in a linked list:
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
107 SchedulerStatus stat;
110 pthread_cond_t wakeup;
112 struct StgMainThread_ *link;
115 /* Main thread queue.
116 * Locks required: sched_mutex.
118 static StgMainThread *main_threads;
121 * Locks required: sched_mutex.
125 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
126 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
129 In GranSim we have a runable and a blocked queue for each processor.
130 In order to minimise code changes new arrays run_queue_hds/tls
131 are created. run_queue_hd is then a short cut (macro) for
132 run_queue_hds[CurrentProc] (see GranSim.h).
135 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
136 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
137 StgTSO *ccalling_threadss[MAX_PROC];
138 /* We use the same global list of threads (all_threads) in GranSim as in
139 the std RTS (i.e. we are cheating). However, we don't use this list in
140 the GranSim specific code at the moment (so we are only potentially
145 StgTSO *run_queue_hd, *run_queue_tl;
146 StgTSO *blocked_queue_hd, *blocked_queue_tl;
150 /* Linked list of all threads.
151 * Used for detecting garbage collected threads.
155 /* Threads suspended in _ccall_GC.
157 static StgTSO *suspended_ccalling_threads;
159 static void GetRoots(void);
160 static StgTSO *threadStackOverflow(StgTSO *tso);
162 /* KH: The following two flags are shared memory locations. There is no need
163 to lock them, since they are only unset at the end of a scheduler
167 /* flag set by signal handler to precipitate a context switch */
168 //@cindex context_switch
171 /* if this flag is set as well, give up execution */
172 //@cindex interrupted
175 /* Next thread ID to allocate.
176 * Locks required: sched_mutex
178 //@cindex next_thread_id
179 StgThreadID next_thread_id = 1;
182 * Pointers to the state of the current thread.
183 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
184 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
187 /* The smallest stack size that makes any sense is:
188 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
189 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
190 * + 1 (the realworld token for an IO thread)
191 * + 1 (the closure to enter)
193 * A thread with this stack will bomb immediately with a stack
194 * overflow, which will increase its stack size.
197 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
199 /* Free capability list.
200 * Locks required: sched_mutex.
203 //@cindex free_capabilities
204 //@cindex n_free_capabilities
205 Capability *free_capabilities; /* Available capabilities for running threads */
206 nat n_free_capabilities; /* total number of available capabilities */
208 //@cindex MainRegTable
209 Capability MainRegTable; /* for non-SMP, we have one global capability */
218 /* All our current task ids, saved in case we need to kill them later.
225 void addToBlockedQueue ( StgTSO *tso );
227 static void schedule ( void );
228 void interruptStgRts ( void );
230 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
232 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
236 static void sched_belch(char *s, ...);
240 //@cindex sched_mutex
242 //@cindex thread_ready_cond
243 //@cindex gc_pending_cond
244 pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
245 pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
246 pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
247 pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
254 rtsTime TimeOfLastYield;
258 char *whatNext_strs[] = {
266 char *threadReturnCode_strs[] = {
267 "HeapOverflow", /* might also be StackOverflow */
276 * The thread state for the main thread.
277 // ToDo: check whether not needed any more
281 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
282 //@subsection Main scheduling loop
284 /* ---------------------------------------------------------------------------
285 Main scheduling loop.
287 We use round-robin scheduling, each thread returning to the
288 scheduler loop when one of these conditions is detected:
291 * timer expires (thread yields)
296 Locking notes: we acquire the scheduler lock once at the beginning
297 of the scheduler loop, and release it when
299 * running a thread, or
300 * waiting for work, or
301 * waiting for a GC to complete.
304 In a GranSim setup this loop iterates over the global event queue.
305 This revolves around the global event queue, which determines what
306 to do next. Therefore, it's more complicated than either the
307 concurrent or the parallel (GUM) setup.
310 GUM iterates over incoming messages.
311 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312 and sends out a fish whenever it has nothing to do; in-between
313 doing the actual reductions (shared code below) it processes the
314 incoming messages and deals with delayed operations
315 (see PendingFetches).
316 This is not the ugliest code you could imagine, but it's bloody close.
318 ------------------------------------------------------------------------ */
325 StgThreadReturnCode ret;
334 rtsBool was_interrupted = rtsFalse;
336 ACQUIRE_LOCK(&sched_mutex);
340 /* set up first event to get things going */
341 /* ToDo: assign costs for system setup and init MainTSO ! */
342 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
344 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
347 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
348 G_TSO(CurrentTSO, 5));
350 if (RtsFlags.GranFlags.Light) {
351 /* Save current time; GranSim Light only */
352 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
355 event = get_next_event();
357 while (event!=(rtsEvent*)NULL) {
358 /* Choose the processor with the next event */
359 CurrentProc = event->proc;
360 CurrentTSO = event->tso;
364 while (!GlobalStopPending) { /* GlobalStopPending set in par_exit */
372 IF_DEBUG(scheduler, printAllThreads());
374 /* If we're interrupted (the user pressed ^C, or some other
375 * termination condition occurred), kill all the currently running
379 IF_DEBUG(scheduler, sched_belch("interrupted"));
380 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
383 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
386 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
387 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
388 interrupted = rtsFalse;
389 was_interrupted = rtsTrue;
392 /* Go through the list of main threads and wake up any
393 * clients whose computations have finished. ToDo: this
394 * should be done more efficiently without a linear scan
395 * of the main threads list, somehow...
399 StgMainThread *m, **prev;
400 prev = &main_threads;
401 for (m = main_threads; m != NULL; m = m->link) {
402 switch (m->tso->what_next) {
405 *(m->ret) = (StgClosure *)m->tso->sp[0];
409 pthread_cond_broadcast(&m->wakeup);
413 if (was_interrupted) {
414 m->stat = Interrupted;
418 pthread_cond_broadcast(&m->wakeup);
428 /* in GUM do this only on the Main PE */
431 /* If our main thread has finished or been killed, return.
434 StgMainThread *m = main_threads;
435 if (m->tso->what_next == ThreadComplete
436 || m->tso->what_next == ThreadKilled) {
437 main_threads = main_threads->link;
438 if (m->tso->what_next == ThreadComplete) {
439 /* we finished successfully, fill in the return value */
440 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
444 if (was_interrupted) {
445 m->stat = Interrupted;
455 /* Top up the run queue from our spark pool. We try to make the
456 * number of threads in the run queue equal to the number of
461 nat n = n_free_capabilities;
462 StgTSO *tso = run_queue_hd;
464 /* Count the run queue */
465 while (n > 0 && tso != END_TSO_QUEUE) {
474 break; /* no more sparks in the pool */
476 /* I'd prefer this to be done in activateSpark -- HWL */
477 /* tricky - it needs to hold the scheduler lock and
478 * not try to re-acquire it -- SDM */
480 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
481 pushClosure(tso,spark);
482 PUSH_ON_RUN_QUEUE(tso);
484 advisory_thread_count++;
488 sched_belch("turning spark of closure %p into a thread",
489 (StgClosure *)spark));
492 /* We need to wake up the other tasks if we just created some
495 if (n_free_capabilities - n > 1) {
496 pthread_cond_signal(&thread_ready_cond);
501 /* Check whether any waiting threads need to be woken up. If the
502 * run queue is empty, and there are no other tasks running, we
503 * can wait indefinitely for something to happen.
504 * ToDo: what if another client comes along & requests another
507 if (blocked_queue_hd != END_TSO_QUEUE) {
509 (run_queue_hd == END_TSO_QUEUE)
511 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
516 /* check for signals each time around the scheduler */
518 if (signals_pending()) {
519 start_signal_handlers();
523 /* Detect deadlock: when we have no threads to run, there are
524 * no threads waiting on I/O or sleeping, and all the other
525 * tasks are waiting for work, we must have a deadlock. Inform
526 * all the main threads.
529 if (blocked_queue_hd == END_TSO_QUEUE
530 && run_queue_hd == END_TSO_QUEUE
531 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
534 for (m = main_threads; m != NULL; m = m->link) {
537 pthread_cond_broadcast(&m->wakeup);
542 if (blocked_queue_hd == END_TSO_QUEUE
543 && run_queue_hd == END_TSO_QUEUE) {
544 StgMainThread *m = main_threads;
547 main_threads = m->link;
553 /* If there's a GC pending, don't do anything until it has
557 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
558 pthread_cond_wait(&gc_pending_cond, &sched_mutex);
561 /* block until we've got a thread on the run queue and a free
564 while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
565 IF_DEBUG(scheduler, sched_belch("waiting for work"));
566 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
567 IF_DEBUG(scheduler, sched_belch("work now available"));
573 if (RtsFlags.GranFlags.Light)
574 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
576 /* adjust time based on time-stamp */
577 if (event->time > CurrentTime[CurrentProc] &&
578 event->evttype != ContinueThread)
579 CurrentTime[CurrentProc] = event->time;
581 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
582 if (!RtsFlags.GranFlags.Light)
585 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
587 /* main event dispatcher in GranSim */
588 switch (event->evttype) {
589 /* Should just be continuing execution */
591 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
592 /* ToDo: check assertion
593 ASSERT(run_queue_hd != (StgTSO*)NULL &&
594 run_queue_hd != END_TSO_QUEUE);
596 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
597 if (!RtsFlags.GranFlags.DoAsyncFetch &&
598 procStatus[CurrentProc]==Fetching) {
599 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
600 CurrentTSO->id, CurrentTSO, CurrentProc);
603 /* Ignore ContinueThreads for completed threads */
604 if (CurrentTSO->what_next == ThreadComplete) {
605 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
606 CurrentTSO->id, CurrentTSO, CurrentProc);
609 /* Ignore ContinueThreads for threads that are being migrated */
610 if (PROCS(CurrentTSO)==Nowhere) {
611 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
612 CurrentTSO->id, CurrentTSO, CurrentProc);
615 /* The thread should be at the beginning of the run queue */
616 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
617 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
618 CurrentTSO->id, CurrentTSO, CurrentProc);
619 break; // run the thread anyway
622 new_event(proc, proc, CurrentTime[proc],
624 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
626 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
627 break; // now actually run the thread; DaH Qu'vam yImuHbej
630 do_the_fetchnode(event);
631 goto next_thread; /* handle next event in event queue */
634 do_the_globalblock(event);
635 goto next_thread; /* handle next event in event queue */
638 do_the_fetchreply(event);
639 goto next_thread; /* handle next event in event queue */
641 case UnblockThread: /* Move from the blocked queue to the tail of */
642 do_the_unblock(event);
643 goto next_thread; /* handle next event in event queue */
645 case ResumeThread: /* Move from the blocked queue to the tail of */
646 /* the runnable queue ( i.e. Qu' SImqa'lu') */
647 event->tso->gran.blocktime +=
648 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
649 do_the_startthread(event);
650 goto next_thread; /* handle next event in event queue */
653 do_the_startthread(event);
654 goto next_thread; /* handle next event in event queue */
657 do_the_movethread(event);
658 goto next_thread; /* handle next event in event queue */
661 do_the_movespark(event);
662 goto next_thread; /* handle next event in event queue */
665 do_the_findwork(event);
666 goto next_thread; /* handle next event in event queue */
669 barf("Illegal event type %u\n", event->evttype);
672 /* This point was scheduler_loop in the old RTS */
674 IF_DEBUG(gran, belch("GRAN: after main switch"));
676 TimeOfLastEvent = CurrentTime[CurrentProc];
677 TimeOfNextEvent = get_time_of_next_event();
678 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
679 // CurrentTSO = ThreadQueueHd;
681 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
684 if (RtsFlags.GranFlags.Light)
685 GranSimLight_leave_system(event, &ActiveTSO);
687 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
690 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
692 /* in a GranSim setup the TSO stays on the run queue */
694 /* Take a thread from the run queue. */
695 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
698 fprintf(stderr, "GRAN: About to run current thread, which is\n");
701 context_switch = 0; // turned on via GranYield, checking events and time slice
704 DumpGranEvent(GR_SCHEDULE, t));
706 procStatus[CurrentProc] = Busy;
710 if (PendingFetches != END_BF_QUEUE) {
714 /* ToDo: phps merge with spark activation above */
715 /* check whether we have local work and send requests if we have none */
716 if (run_queue_hd == END_TSO_QUEUE) { /* no runnable threads */
717 /* :-[ no local threads => look out for local sparks */
718 /* the spark pool for the current PE */
719 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
720 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
721 pool->hd < pool->tl) {
723 * ToDo: add GC code check that we really have enough heap afterwards!!
725 * If we're here (no runnable threads) and we have pending
726 * sparks, we must have a space problem. Get enough space
727 * to turn one of those pending sparks into a
731 spark = findSpark(); /* get a spark */
732 if (spark != (rtsSpark) NULL) {
733 tso = activateSpark(spark); /* turn the spark into a thread */
734 IF_PAR_DEBUG(schedule,
735 belch("==== schedule: Created TSO %d (%p); %d threads active",
736 tso->id, tso, advisory_thread_count));
738 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
739 belch("==^^ failed to activate spark");
741 } /* otherwise fall through & pick-up new tso */
743 IF_PAR_DEBUG(verbose,
744 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
745 spark_queue_len(pool)));
749 /* =8-[ no local sparks => look for work on other PEs */
752 * We really have absolutely no work. Send out a fish
753 * (there may be some out there already), and wait for
754 * something to arrive. We clearly can't run any threads
755 * until a SCHEDULE or RESUME arrives, and so that's what
756 * we're hoping to see. (Of course, we still have to
757 * respond to other types of messages.)
760 outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
761 // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
762 /* fishing set in sendFish, processFish;
763 avoid flooding system with fishes via delay */
765 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
773 } else if (PacketsWaiting()) { /* Look for incoming messages */
777 /* Now we are sure that we have some work available */
778 ASSERT(run_queue_hd != END_TSO_QUEUE);
779 /* Take a thread from the run queue, if we have work */
780 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
782 /* ToDo: write something to the log-file
783 if (RTSflags.ParFlags.granSimStats && !sameThread)
784 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
788 /* the spark pool for the current PE */
789 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
791 IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)",
792 spark_queue_len(pool),
794 pool->hd, pool->tl, pool->base, pool->lim));
796 IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)",
797 run_queue_len(), CURRENT_PROC,
798 run_queue_hd, run_queue_tl));
803 we are running a different TSO, so write a schedule event to log file
804 NB: If we use fair scheduling we also have to write a deschedule
805 event for LastTSO; with unfair scheduling we know that the
806 previous tso has blocked whenever we switch to another tso, so
807 we don't need it in GUM for now
809 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
810 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
814 #else /* !GRAN && !PAR */
816 /* grab a thread from the run queue
818 ASSERT(run_queue_hd != END_TSO_QUEUE);
820 IF_DEBUG(sanity,checkTSO(t));
827 cap = free_capabilities;
828 free_capabilities = cap->link;
829 n_free_capabilities--;
834 cap->rCurrentTSO = t;
836 /* set the context_switch flag
838 if (run_queue_hd == END_TSO_QUEUE)
843 RELEASE_LOCK(&sched_mutex);
845 IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
846 t->id, t, whatNext_strs[t->what_next]));
848 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
849 /* Run the current thread
851 switch (cap->rCurrentTSO->what_next) {
854 /* Thread already finished, return to scheduler. */
855 ret = ThreadFinished;
858 ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
861 ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
863 case ThreadEnterHugs:
867 IF_DEBUG(scheduler,sched_belch("entering Hugs"));
868 c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
869 cap->rCurrentTSO->sp += 1;
874 barf("Panic: entered a BCO but no bytecode interpreter in this build");
877 barf("schedule: invalid what_next field");
879 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
881 /* Costs for the scheduler are assigned to CCS_SYSTEM */
886 ACQUIRE_LOCK(&sched_mutex);
889 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
890 #elif !defined(GRAN) && !defined(PAR)
891 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
893 t = cap->rCurrentTSO;
896 /* HACK 675: if the last thread didn't yield, make sure to print a
897 SCHEDULE event to the log file when StgRunning the next thread, even
898 if it is the same one as before */
899 LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t;
900 TimeOfLastYield = CURRENT_TIME;
905 /* make all the running tasks block on a condition variable,
906 * maybe set context_switch and wait till they all pile in,
907 * then have them wait on a GC condition variable.
909 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
910 t->id, t, whatNext_strs[t->what_next]));
913 ASSERT(!is_on_queue(t,CurrentProc));
916 ready_to_gc = rtsTrue;
917 context_switch = 1; /* stop other threads ASAP */
918 PUSH_ON_RUN_QUEUE(t);
919 /* actual GC is done at the end of the while loop */
923 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
924 t->id, t, whatNext_strs[t->what_next]));
925 /* just adjust the stack for this thread, then pop it back
931 /* enlarge the stack */
932 StgTSO *new_t = threadStackOverflow(t);
934 /* This TSO has moved, so update any pointers to it from the
935 * main thread stack. It better not be on any other queues...
938 for (m = main_threads; m != NULL; m = m->link) {
944 PUSH_ON_RUN_QUEUE(new_t);
951 DumpGranEvent(GR_DESCHEDULE, t));
952 globalGranStats.tot_yields++;
955 DumpGranEvent(GR_DESCHEDULE, t));
957 /* put the thread back on the run queue. Then, if we're ready to
958 * GC, check whether this is the last task to stop. If so, wake
959 * up the GC thread. getThread will block during a GC until the
963 if (t->what_next == ThreadEnterHugs) {
964 /* ToDo: or maybe a timer expired when we were in Hugs?
965 * or maybe someone hit ctrl-C
967 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
968 t->id, t, whatNext_strs[t->what_next]);
970 belch("--<< thread %ld (%p; %s) stopped, yielding",
971 t->id, t, whatNext_strs[t->what_next]);
978 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
980 ASSERT(t->link == END_TSO_QUEUE);
982 ASSERT(!is_on_queue(t,CurrentProc));
985 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
986 checkThreadQsSanity(rtsTrue));
988 APPEND_TO_RUN_QUEUE(t);
990 /* add a ContinueThread event to actually process the thread */
991 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
993 t, (StgClosure*)NULL, (rtsSpark*)NULL);
995 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1004 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1005 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1006 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1008 // ??? needed; should emit block before
1010 DumpGranEvent(GR_DESCHEDULE, t));
1011 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1014 ASSERT(procStatus[CurrentProc]==Busy ||
1015 ((procStatus[CurrentProc]==Fetching) &&
1016 (t->block_info.closure!=(StgClosure*)NULL)));
1017 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1018 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1019 procStatus[CurrentProc]==Fetching))
1020 procStatus[CurrentProc] = Idle;
1024 DumpGranEvent(GR_DESCHEDULE, t));
1026 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1030 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1031 t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1032 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1035 /* don't need to do anything. Either the thread is blocked on
1036 * I/O, in which case we'll have called addToBlockedQueue
1037 * previously, or it's blocked on an MVar or Blackhole, in which
1038 * case it'll be on the relevant queue already.
1041 fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1042 printThreadBlockage(t);
1043 fprintf(stderr, "\n"));
1045 /* Only for dumping event to log file
1046 ToDo: do I need this in GranSim, too?
1053 case ThreadFinished:
1054 /* Need to check whether this was a main thread, and if so, signal
1055 * the task that started it with the return value. If we have no
1056 * more main threads, we probably need to stop all the tasks until
1059 /* We also end up here if the thread kills itself with an
1060 * uncaught exception, see Exception.hc.
1062 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1064 endThread(t, CurrentProc); // clean-up the thread
1066 advisory_thread_count--;
1067 if (RtsFlags.ParFlags.ParStats.Full)
1068 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1073 barf("schedule: invalid thread return code %d", (int)ret);
1077 cap->link = free_capabilities;
1078 free_capabilities = cap;
1079 n_free_capabilities++;
1083 if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
1088 /* everybody back, start the GC.
1089 * Could do it in this thread, or signal a condition var
1090 * to do it in another thread. Either way, we need to
1091 * broadcast on gc_pending_cond afterward.
1094 IF_DEBUG(scheduler,sched_belch("doing GC"));
1096 GarbageCollect(GetRoots,rtsFalse);
1097 ready_to_gc = rtsFalse;
1099 pthread_cond_broadcast(&gc_pending_cond);
1102 /* add a ContinueThread event to continue execution of current thread */
1103 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1105 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1107 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1114 IF_GRAN_DEBUG(unused,
1115 print_eventq(EventHd));
1117 event = get_next_event();
1121 /* ToDo: wait for next message to arrive rather than busy wait */
1126 t = take_off_run_queue(END_TSO_QUEUE);
1129 } /* end of while(1) */
1132 /* A hack for Hugs concurrency support. Needs sanitisation (?) */
1133 void deleteAllThreads ( void )
1136 IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
1137 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1140 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1143 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1144 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1147 /* startThread and insertThread are now in GranSim.c -- HWL */
1149 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1150 //@subsection Suspend and Resume
1152 /* ---------------------------------------------------------------------------
1153 * Suspending & resuming Haskell threads.
1155 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1156 * its capability before calling the C function. This allows another
1157 * task to pick up the capability and carry on running Haskell
1158 * threads. It also means that if the C call blocks, it won't lock
1161 * The Haskell thread making the C call is put to sleep for the
1162 * duration of the call, on the susepended_ccalling_threads queue. We
1163 * give out a token to the task, which it can use to resume the thread
1164 * on return from the C function.
1165 * ------------------------------------------------------------------------- */
1168 suspendThread( Capability *cap )
1172 ACQUIRE_LOCK(&sched_mutex);
1175 sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
1177 threadPaused(cap->rCurrentTSO);
1178 cap->rCurrentTSO->link = suspended_ccalling_threads;
1179 suspended_ccalling_threads = cap->rCurrentTSO;
1181 /* Use the thread ID as the token; it should be unique */
1182 tok = cap->rCurrentTSO->id;
1185 cap->link = free_capabilities;
1186 free_capabilities = cap;
1187 n_free_capabilities++;
1190 RELEASE_LOCK(&sched_mutex);
1195 resumeThread( StgInt tok )
1197 StgTSO *tso, **prev;
1200 ACQUIRE_LOCK(&sched_mutex);
1202 prev = &suspended_ccalling_threads;
1203 for (tso = suspended_ccalling_threads;
1204 tso != END_TSO_QUEUE;
1205 prev = &tso->link, tso = tso->link) {
1206 if (tso->id == (StgThreadID)tok) {
1211 if (tso == END_TSO_QUEUE) {
1212 barf("resumeThread: thread not found");
1216 while (free_capabilities == NULL) {
1217 IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1218 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1219 IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1221 cap = free_capabilities;
1222 free_capabilities = cap->link;
1223 n_free_capabilities--;
1225 cap = &MainRegTable;
1228 cap->rCurrentTSO = tso;
1230 RELEASE_LOCK(&sched_mutex);
1235 /* ---------------------------------------------------------------------------
1237 * ------------------------------------------------------------------------ */
1238 static void unblockThread(StgTSO *tso);
1240 /* ---------------------------------------------------------------------------
1241 * Comparing Thread ids.
1243 * This is used from STG land in the implementation of the
1244 * instances of Eq/Ord for ThreadIds.
1245 * ------------------------------------------------------------------------ */
1247 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1249 StgThreadID id1 = tso1->id;
1250 StgThreadID id2 = tso2->id;
1252 if (id1 < id2) return (-1);
1253 if (id1 > id2) return 1;
1257 /* ---------------------------------------------------------------------------
1258 Create a new thread.
1260 The new thread starts with the given stack size. Before the
1261 scheduler can run, however, this thread needs to have a closure
1262 (and possibly some arguments) pushed on its stack. See
1263 pushClosure() in Schedule.h.
1265 createGenThread() and createIOThread() (in SchedAPI.h) are
1266 convenient packaged versions of this function.
1268 currently pri (priority) is only used in a GRAN setup -- HWL
1269 ------------------------------------------------------------------------ */
1270 //@cindex createThread
1272 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1274 createThread(nat stack_size, StgInt pri)
1276 return createThread_(stack_size, rtsFalse, pri);
1280 createThread_(nat size, rtsBool have_lock, StgInt pri)
1284 createThread(nat stack_size)
1286 return createThread_(stack_size, rtsFalse);
1290 createThread_(nat size, rtsBool have_lock)
1297 /* First check whether we should create a thread at all */
1299 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1300 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1302 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1303 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1304 return END_TSO_QUEUE;
1310 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1313 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1315 /* catch ridiculously small stack sizes */
1316 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1317 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1320 stack_size = size - TSO_STRUCT_SIZEW;
1322 tso = (StgTSO *)allocate(size);
1323 TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1325 SET_HDR(tso, &TSO_info, CCS_SYSTEM);
1327 SET_GRAN_HDR(tso, ThisPE);
1329 tso->what_next = ThreadEnterGHC;
1331 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1332 * protect the increment operation on next_thread_id.
1333 * In future, we could use an atomic increment instead.
1335 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1336 tso->id = next_thread_id++;
1337 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1339 tso->why_blocked = NotBlocked;
1340 tso->blocked_exceptions = NULL;
1342 tso->splim = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1343 tso->stack_size = stack_size;
1344 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1346 tso->sp = (P_)&(tso->stack) + stack_size;
1349 tso->prof.CCCS = CCS_MAIN;
1352 /* put a stop frame on the stack */
1353 tso->sp -= sizeofW(StgStopFrame);
1354 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1355 tso->su = (StgUpdateFrame*)tso->sp;
1359 tso->link = END_TSO_QUEUE;
1360 /* uses more flexible routine in GranSim */
1361 insertThread(tso, CurrentProc);
1363 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1368 #if defined(GRAN) || defined(PAR)
1369 DumpGranEvent(GR_START,tso);
1372 /* Link the new thread on the global thread list.
1374 tso->global_link = all_threads;
1378 tso->gran.pri = pri;
1380 tso->gran.magic = TSO_MAGIC; // debugging only
1382 tso->gran.sparkname = 0;
1383 tso->gran.startedat = CURRENT_TIME;
1384 tso->gran.exported = 0;
1385 tso->gran.basicblocks = 0;
1386 tso->gran.allocs = 0;
1387 tso->gran.exectime = 0;
1388 tso->gran.fetchtime = 0;
1389 tso->gran.fetchcount = 0;
1390 tso->gran.blocktime = 0;
1391 tso->gran.blockcount = 0;
1392 tso->gran.blockedat = 0;
1393 tso->gran.globalsparks = 0;
1394 tso->gran.localsparks = 0;
1395 if (RtsFlags.GranFlags.Light)
1396 tso->gran.clock = Now; /* local clock */
1398 tso->gran.clock = 0;
1400 IF_DEBUG(gran,printTSO(tso));
1403 tso->par.magic = TSO_MAGIC; // debugging only
1405 tso->par.sparkname = 0;
1406 tso->par.startedat = CURRENT_TIME;
1407 tso->par.exported = 0;
1408 tso->par.basicblocks = 0;
1409 tso->par.allocs = 0;
1410 tso->par.exectime = 0;
1411 tso->par.fetchtime = 0;
1412 tso->par.fetchcount = 0;
1413 tso->par.blocktime = 0;
1414 tso->par.blockcount = 0;
1415 tso->par.blockedat = 0;
1416 tso->par.globalsparks = 0;
1417 tso->par.localsparks = 0;
1421 globalGranStats.tot_threads_created++;
1422 globalGranStats.threads_created_on_PE[CurrentProc]++;
1423 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1424 globalGranStats.tot_sq_probes++;
1429 belch("==__ schedule: Created TSO %d (%p);",
1430 CurrentProc, tso, tso->id));
1432 IF_PAR_DEBUG(verbose,
1433 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1434 tso->id, tso, advisory_thread_count));
1436 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1437 tso->id, tso->stack_size));
1443 Turn a spark into a thread.
1444 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1447 //@cindex activateSpark
1449 activateSpark (rtsSpark spark)
1453 ASSERT(spark != (rtsSpark)NULL);
1454 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1455 if (tso!=END_TSO_QUEUE) {
1456 pushClosure(tso,spark);
1457 PUSH_ON_RUN_QUEUE(tso);
1458 advisory_thread_count++;
1460 if (RtsFlags.ParFlags.ParStats.Full) {
1461 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1462 IF_PAR_DEBUG(verbose,
1463 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1464 (StgClosure *)spark, info_type((StgClosure *)spark)));
1467 barf("activateSpark: Cannot create TSO");
1469 // ToDo: fwd info on local/global spark to thread -- HWL
1470 // tso->gran.exported = spark->exported;
1471 // tso->gran.locked = !spark->global;
1472 // tso->gran.sparkname = spark->name;
1478 /* ---------------------------------------------------------------------------
1481 * scheduleThread puts a thread on the head of the runnable queue.
1482 * This will usually be done immediately after a thread is created.
1483 * The caller of scheduleThread must create the thread using e.g.
1484 * createThread and push an appropriate closure
1485 * on this thread's stack before the scheduler is invoked.
1486 * ------------------------------------------------------------------------ */
1489 scheduleThread(StgTSO *tso)
1491 if (tso==END_TSO_QUEUE){
1496 ACQUIRE_LOCK(&sched_mutex);
1498 /* Put the new thread on the head of the runnable queue. The caller
1499 * better push an appropriate closure on this thread's stack
1500 * beforehand. In the SMP case, the thread may start running as
1501 * soon as we release the scheduler lock below.
1503 PUSH_ON_RUN_QUEUE(tso);
1507 IF_DEBUG(scheduler,printTSO(tso));
1509 RELEASE_LOCK(&sched_mutex);
1512 /* ---------------------------------------------------------------------------
1515 * Start up Posix threads to run each of the scheduler tasks.
1516 * I believe the task ids are not needed in the system as defined.
1518 * ------------------------------------------------------------------------ */
1520 #if defined(PAR) || defined(SMP)
1522 taskStart( void *arg STG_UNUSED )
1524 rts_evalNothing(NULL);
1528 /* ---------------------------------------------------------------------------
1531 * Initialise the scheduler. This resets all the queues - if the
1532 * queues contained any threads, they'll be garbage collected at the
1535 * This now calls startTasks(), so should only be called once! KH @ 25/10/99
1536 * ------------------------------------------------------------------------ */
1540 term_handler(int sig STG_UNUSED)
1543 ACQUIRE_LOCK(&term_mutex);
1545 RELEASE_LOCK(&term_mutex);
1550 //@cindex initScheduler
1557 for (i=0; i<=MAX_PROC; i++) {
1558 run_queue_hds[i] = END_TSO_QUEUE;
1559 run_queue_tls[i] = END_TSO_QUEUE;
1560 blocked_queue_hds[i] = END_TSO_QUEUE;
1561 blocked_queue_tls[i] = END_TSO_QUEUE;
1562 ccalling_threadss[i] = END_TSO_QUEUE;
1565 run_queue_hd = END_TSO_QUEUE;
1566 run_queue_tl = END_TSO_QUEUE;
1567 blocked_queue_hd = END_TSO_QUEUE;
1568 blocked_queue_tl = END_TSO_QUEUE;
1571 suspended_ccalling_threads = END_TSO_QUEUE;
1573 main_threads = NULL;
1574 all_threads = END_TSO_QUEUE;
1580 ecafList = END_ECAF_LIST;
1584 /* Install the SIGHUP handler */
1587 struct sigaction action,oact;
1589 action.sa_handler = term_handler;
1590 sigemptyset(&action.sa_mask);
1591 action.sa_flags = 0;
1592 if (sigaction(SIGTERM, &action, &oact) != 0) {
1593 barf("can't install TERM handler");
1599 /* Allocate N Capabilities */
1602 Capability *cap, *prev;
1605 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1606 cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1610 free_capabilities = cap;
1611 n_free_capabilities = RtsFlags.ParFlags.nNodes;
1613 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1614 n_free_capabilities););
1617 #if defined(SMP) || defined(PAR)
1630 /* make some space for saving all the thread ids */
1631 task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1632 "initScheduler:task_ids");
1634 /* and create all the threads */
1635 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1636 r = pthread_create(&tid,NULL,taskStart,NULL);
1638 barf("startTasks: Can't create new Posix thread");
1640 task_ids[i].id = tid;
1641 task_ids[i].mut_time = 0.0;
1642 task_ids[i].mut_etime = 0.0;
1643 task_ids[i].gc_time = 0.0;
1644 task_ids[i].gc_etime = 0.0;
1645 task_ids[i].elapsedtimestart = elapsedtime();
1646 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1652 exitScheduler( void )
1657 /* Don't want to use pthread_cancel, since we'd have to install
1658 * these silly exception handlers (pthread_cleanup_{push,pop}) around
1662 /* Cancel all our tasks */
1663 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1664 pthread_cancel(task_ids[i].id);
1667 /* Wait for all the tasks to terminate */
1668 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1669 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n",
1671 pthread_join(task_ids[i].id, NULL);
1675 /* Send 'em all a SIGHUP. That should shut 'em up.
1677 await_death = RtsFlags.ParFlags.nNodes;
1678 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1679 pthread_kill(task_ids[i].id,SIGTERM);
1681 while (await_death > 0) {
1687 /* -----------------------------------------------------------------------------
1688 Managing the per-task allocation areas.
1690 Each capability comes with an allocation area. These are
1691 fixed-length block lists into which allocation can be done.
1693 ToDo: no support for two-space collection at the moment???
1694 -------------------------------------------------------------------------- */
1696 /* -----------------------------------------------------------------------------
1697 * waitThread is the external interface for running a new computation
1698 * and waiting for the result.
1700 * In the non-SMP case, we create a new main thread, push it on the
1701 * main-thread stack, and invoke the scheduler to run it. The
1702 * scheduler will return when the top main thread on the stack has
1703 * completed or died, and fill in the necessary fields of the
1704 * main_thread structure.
1706 * In the SMP case, we create a main thread as before, but we then
1707 * create a new condition variable and sleep on it. When our new
1708 * main thread has completed, we'll be woken up and the status/result
1709 * will be in the main_thread struct.
1710 * -------------------------------------------------------------------------- */
1713 howManyThreadsAvail ( void )
1717 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1719 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1725 finishAllThreads ( void )
1728 while (run_queue_hd != END_TSO_QUEUE) {
1729 waitThread ( run_queue_hd, NULL );
1731 while (blocked_queue_hd != END_TSO_QUEUE) {
1732 waitThread ( blocked_queue_hd, NULL );
1735 (blocked_queue_hd != END_TSO_QUEUE ||
1736 run_queue_hd != END_TSO_QUEUE);
1740 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1743 SchedulerStatus stat;
1745 ACQUIRE_LOCK(&sched_mutex);
1747 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1753 pthread_cond_init(&m->wakeup, NULL);
1756 m->link = main_threads;
1759 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n",
1764 pthread_cond_wait(&m->wakeup, &sched_mutex);
1765 } while (m->stat == NoStatus);
1767 /* GranSim specific init */
1768 CurrentTSO = m->tso; // the TSO to run
1769 procStatus[MainProc] = Busy; // status of main PE
1770 CurrentProc = MainProc; // PE to run it on
1775 ASSERT(m->stat != NoStatus);
1781 pthread_cond_destroy(&m->wakeup);
1784 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n",
1788 RELEASE_LOCK(&sched_mutex);
1793 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1794 //@subsection Run queue code
1798 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1799 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1800 implicit global variable that has to be correct when calling these
1804 /* Put the new thread on the head of the runnable queue.
1805 * The caller of createThread better push an appropriate closure
1806 * on this thread's stack before the scheduler is invoked.
1808 static /* inline */ void
1809 add_to_run_queue(tso)
1812 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1813 tso->link = run_queue_hd;
1815 if (run_queue_tl == END_TSO_QUEUE) {
1820 /* Put the new thread at the end of the runnable queue. */
1821 static /* inline */ void
1822 push_on_run_queue(tso)
1825 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1826 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1827 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1828 if (run_queue_hd == END_TSO_QUEUE) {
1831 run_queue_tl->link = tso;
1837 Should be inlined because it's used very often in schedule. The tso
1838 argument is actually only needed in GranSim, where we want to have the
1839 possibility to schedule *any* TSO on the run queue, irrespective of the
1840 actual ordering. Therefore, if tso is not the nil TSO then we traverse
1841 the run queue and dequeue the tso, adjusting the links in the queue.
1843 //@cindex take_off_run_queue
1844 static /* inline */ StgTSO*
1845 take_off_run_queue(StgTSO *tso) {
1849 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1851 if tso is specified, unlink that tso from the run_queue (doesn't have
1852 to be at the beginning of the queue); GranSim only
1854 if (tso!=END_TSO_QUEUE) {
1855 /* find tso in queue */
1856 for (t=run_queue_hd, prev=END_TSO_QUEUE;
1857 t!=END_TSO_QUEUE && t!=tso;
1861 /* now actually dequeue the tso */
1862 if (prev!=END_TSO_QUEUE) {
1863 ASSERT(run_queue_hd!=t);
1864 prev->link = t->link;
1866 /* t is at beginning of thread queue */
1867 ASSERT(run_queue_hd==t);
1868 run_queue_hd = t->link;
1870 /* t is at end of thread queue */
1871 if (t->link==END_TSO_QUEUE) {
1872 ASSERT(t==run_queue_tl);
1873 run_queue_tl = prev;
1875 ASSERT(run_queue_tl!=t);
1877 t->link = END_TSO_QUEUE;
1879 /* take tso from the beginning of the queue; std concurrent code */
1881 if (t != END_TSO_QUEUE) {
1882 run_queue_hd = t->link;
1883 t->link = END_TSO_QUEUE;
1884 if (run_queue_hd == END_TSO_QUEUE) {
1885 run_queue_tl = END_TSO_QUEUE;
1894 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1895 //@subsection Garbage Collextion Routines
1897 /* ---------------------------------------------------------------------------
1898 Where are the roots that we know about?
1900 - all the threads on the runnable queue
1901 - all the threads on the blocked queue
1902 - all the thread currently executing a _ccall_GC
1903 - all the "main threads"
1905 ------------------------------------------------------------------------ */
1907 /* This has to be protected either by the scheduler monitor, or by the
1908 garbage collection monitor (probably the latter).
1912 static void GetRoots(void)
1919 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1920 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1921 run_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1922 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1923 run_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1925 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1926 blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1927 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1928 blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1929 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1930 ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1937 if (run_queue_hd != END_TSO_QUEUE) {
1938 ASSERT(run_queue_tl != END_TSO_QUEUE);
1939 run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1940 run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1943 if (blocked_queue_hd != END_TSO_QUEUE) {
1944 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1945 blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1946 blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1950 for (m = main_threads; m != NULL; m = m->link) {
1951 m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1953 if (suspended_ccalling_threads != END_TSO_QUEUE)
1954 suspended_ccalling_threads =
1955 (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1957 #if defined(SMP) || defined(PAR) || defined(GRAN)
1962 /* -----------------------------------------------------------------------------
1965 This is the interface to the garbage collector from Haskell land.
1966 We provide this so that external C code can allocate and garbage
1967 collect when called from Haskell via _ccall_GC.
1969 It might be useful to provide an interface whereby the programmer
1970 can specify more roots (ToDo).
1972 This needs to be protected by the GC condition variable above. KH.
1973 -------------------------------------------------------------------------- */
1975 void (*extra_roots)(void);
1980 GarbageCollect(GetRoots,rtsFalse);
1984 performMajorGC(void)
1986 GarbageCollect(GetRoots,rtsTrue);
1992 GetRoots(); /* the scheduler's roots */
1993 extra_roots(); /* the user's roots */
1997 performGCWithRoots(void (*get_roots)(void))
1999 extra_roots = get_roots;
2001 GarbageCollect(AllRoots,rtsFalse);
2004 /* -----------------------------------------------------------------------------
2007 If the thread has reached its maximum stack size, then raise the
2008 StackOverflow exception in the offending thread. Otherwise
2009 relocate the TSO into a larger chunk of memory and adjust its stack
2011 -------------------------------------------------------------------------- */
2014 threadStackOverflow(StgTSO *tso)
2016 nat new_stack_size, new_tso_size, diff, stack_words;
2020 IF_DEBUG(sanity,checkTSO(tso));
2021 if (tso->stack_size >= tso->max_stack_size) {
2024 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2025 tso->id, tso, tso->stack_size, tso->max_stack_size);
2026 /* If we're debugging, just print out the top of the stack */
2027 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2031 fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
2034 /* Send this thread the StackOverflow exception */
2035 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2040 /* Try to double the current stack size. If that takes us over the
2041 * maximum stack size for this thread, then use the maximum instead.
2042 * Finally round up so the TSO ends up as a whole number of blocks.
2044 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2045 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2046 TSO_STRUCT_SIZE)/sizeof(W_);
2047 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2048 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2050 IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2052 dest = (StgTSO *)allocate(new_tso_size);
2053 TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2055 /* copy the TSO block and the old stack into the new area */
2056 memcpy(dest,tso,TSO_STRUCT_SIZE);
2057 stack_words = tso->stack + tso->stack_size - tso->sp;
2058 new_sp = (P_)dest + new_tso_size - stack_words;
2059 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2061 /* relocate the stack pointers... */
2062 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2063 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2065 dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
2066 dest->stack_size = new_stack_size;
2068 /* and relocate the update frame list */
2069 relocate_TSO(tso, dest);
2071 /* Mark the old TSO as relocated. We have to check for relocated
2072 * TSOs in the garbage collector and any primops that deal with TSOs.
2074 * It's important to set the sp and su values to just beyond the end
2075 * of the stack, so we don't attempt to scavenge any part of the
2078 tso->what_next = ThreadRelocated;
2080 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2081 tso->su = (StgUpdateFrame *)tso->sp;
2082 tso->why_blocked = NotBlocked;
2083 dest->mut_link = NULL;
2085 IF_PAR_DEBUG(verbose,
2086 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2087 tso->id, tso, tso->stack_size);
2088 /* If we're debugging, just print out the top of the stack */
2089 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2092 IF_DEBUG(sanity,checkTSO(tso));
2094 IF_DEBUG(scheduler,printTSO(dest));
2100 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2101 //@subsection Blocking Queue Routines
2103 /* ---------------------------------------------------------------------------
2104 Wake up a queue that was blocked on some resource.
2105 ------------------------------------------------------------------------ */
2107 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
2111 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2116 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2118 /* write RESUME events to log file and
2119 update blocked and fetch time (depending on type of the orig closure) */
2120 if (RtsFlags.ParFlags.ParStats.Full) {
2121 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2122 GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2123 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2125 switch (get_itbl(node)->type) {
2127 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2132 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2135 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2142 static StgBlockingQueueElement *
2143 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2146 PEs node_loc, tso_loc;
2148 node_loc = where_is(node); // should be lifted out of loop
2149 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2150 tso_loc = where_is((StgClosure *)tso);
2151 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2152 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2153 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2154 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2155 // insertThread(tso, node_loc);
2156 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2158 tso, node, (rtsSpark*)NULL);
2159 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2162 } else { // TSO is remote (actually should be FMBQ)
2163 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2164 RtsFlags.GranFlags.Costs.gunblocktime +
2165 RtsFlags.GranFlags.Costs.latency;
2166 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2168 tso, node, (rtsSpark*)NULL);
2169 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2172 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2174 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2175 (node_loc==tso_loc ? "Local" : "Global"),
2176 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2177 tso->block_info.closure = NULL;
2178 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2182 static StgBlockingQueueElement *
2183 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2185 StgBlockingQueueElement *next;
2187 switch (get_itbl(bqe)->type) {
2189 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2190 /* if it's a TSO just push it onto the run_queue */
2192 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2193 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2195 unblockCount(bqe, node);
2196 /* reset blocking status after dumping event */
2197 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2201 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2203 bqe->link = PendingFetches;
2204 PendingFetches = bqe;
2208 /* can ignore this case in a non-debugging setup;
2209 see comments on RBHSave closures above */
2211 /* check that the closure is an RBHSave closure */
2212 ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
2213 get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
2214 get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
2218 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2219 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2223 // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2227 #else /* !GRAN && !PAR */
2229 unblockOneLocked(StgTSO *tso)
2233 ASSERT(get_itbl(tso)->type == TSO);
2234 ASSERT(tso->why_blocked != NotBlocked);
2235 tso->why_blocked = NotBlocked;
2237 PUSH_ON_RUN_QUEUE(tso);
2239 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2244 #if defined(GRAN) || defined(PAR)
2245 inline StgBlockingQueueElement *
2246 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2248 ACQUIRE_LOCK(&sched_mutex);
2249 bqe = unblockOneLocked(bqe, node);
2250 RELEASE_LOCK(&sched_mutex);
2255 unblockOne(StgTSO *tso)
2257 ACQUIRE_LOCK(&sched_mutex);
2258 tso = unblockOneLocked(tso);
2259 RELEASE_LOCK(&sched_mutex);
2266 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2268 StgBlockingQueueElement *bqe;
2273 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2274 node, CurrentProc, CurrentTime[CurrentProc],
2275 CurrentTSO->id, CurrentTSO));
2277 node_loc = where_is(node);
2279 ASSERT(get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2280 get_itbl(q)->type == CONSTR); // closure (type constructor)
2281 ASSERT(is_unique(node));
2283 /* FAKE FETCH: magically copy the node to the tso's proc;
2284 no Fetch necessary because in reality the node should not have been
2285 moved to the other PE in the first place
2287 if (CurrentProc!=node_loc) {
2289 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2290 node, node_loc, CurrentProc, CurrentTSO->id,
2291 // CurrentTSO, where_is(CurrentTSO),
2292 node->header.gran.procs));
2293 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2295 belch("## new bitmask of node %p is %#x",
2296 node, node->header.gran.procs));
2297 if (RtsFlags.GranFlags.GranSimStats.Global) {
2298 globalGranStats.tot_fake_fetches++;
2303 // ToDo: check: ASSERT(CurrentProc==node_loc);
2304 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2307 bqe points to the current element in the queue
2308 next points to the next element in the queue
2310 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2311 //tso_loc = where_is(tso);
2313 bqe = unblockOneLocked(bqe, node);
2316 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2317 the closure to make room for the anchor of the BQ */
2318 if (bqe!=END_BQ_QUEUE) {
2319 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2321 ASSERT((info_ptr==&RBH_Save_0_info) ||
2322 (info_ptr==&RBH_Save_1_info) ||
2323 (info_ptr==&RBH_Save_2_info));
2325 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2326 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2327 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2330 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2331 node, info_type(node)));
2334 /* statistics gathering */
2335 if (RtsFlags.GranFlags.GranSimStats.Global) {
2336 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2337 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2338 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2339 globalGranStats.tot_awbq++; // total no. of bqs awakened
2342 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2343 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2347 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2349 StgBlockingQueueElement *bqe, *next;
2351 ACQUIRE_LOCK(&sched_mutex);
2353 IF_PAR_DEBUG(verbose,
2354 belch("## AwBQ for node %p on [%x]: ",
2357 ASSERT(get_itbl(q)->type == TSO ||
2358 get_itbl(q)->type == BLOCKED_FETCH ||
2359 get_itbl(q)->type == CONSTR);
2362 while (get_itbl(bqe)->type==TSO ||
2363 get_itbl(bqe)->type==BLOCKED_FETCH) {
2364 bqe = unblockOneLocked(bqe, node);
2366 RELEASE_LOCK(&sched_mutex);
2369 #else /* !GRAN && !PAR */
2371 awakenBlockedQueue(StgTSO *tso)
2373 ACQUIRE_LOCK(&sched_mutex);
2374 while (tso != END_TSO_QUEUE) {
2375 tso = unblockOneLocked(tso);
2377 RELEASE_LOCK(&sched_mutex);
2381 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2382 //@subsection Exception Handling Routines
2384 /* ---------------------------------------------------------------------------
2386 - usually called inside a signal handler so it mustn't do anything fancy.
2387 ------------------------------------------------------------------------ */
2390 interruptStgRts(void)
2396 /* -----------------------------------------------------------------------------
2399 This is for use when we raise an exception in another thread, which
2401 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2402 -------------------------------------------------------------------------- */
2404 #if defined(GRAN) || defined(PAR)
2406 NB: only the type of the blocking queue is different in GranSim and GUM
2407 the operations on the queue-elements are the same
2408 long live polymorphism!
2411 unblockThread(StgTSO *tso)
2413 StgBlockingQueueElement *t, **last;
2415 ACQUIRE_LOCK(&sched_mutex);
2416 switch (tso->why_blocked) {
2419 return; /* not blocked */
2422 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2424 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2425 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2427 last = (StgBlockingQueueElement **)&mvar->head;
2428 for (t = (StgBlockingQueueElement *)mvar->head;
2430 last = &t->link, last_tso = t, t = t->link) {
2431 if (t == (StgBlockingQueueElement *)tso) {
2432 *last = (StgBlockingQueueElement *)tso->link;
2433 if (mvar->tail == tso) {
2434 mvar->tail = (StgTSO *)last_tso;
2439 barf("unblockThread (MVAR): TSO not found");
2442 case BlockedOnBlackHole:
2443 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2445 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2447 last = &bq->blocking_queue;
2448 for (t = bq->blocking_queue;
2450 last = &t->link, t = t->link) {
2451 if (t == (StgBlockingQueueElement *)tso) {
2452 *last = (StgBlockingQueueElement *)tso->link;
2456 barf("unblockThread (BLACKHOLE): TSO not found");
2459 case BlockedOnException:
2461 StgTSO *target = tso->block_info.tso;
2463 ASSERT(get_itbl(target)->type == TSO);
2464 ASSERT(target->blocked_exceptions != NULL);
2466 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2467 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2469 last = &t->link, t = t->link) {
2470 ASSERT(get_itbl(t)->type == TSO);
2471 if (t == (StgBlockingQueueElement *)tso) {
2472 *last = (StgBlockingQueueElement *)tso->link;
2476 barf("unblockThread (Exception): TSO not found");
2479 case BlockedOnDelay:
2481 case BlockedOnWrite:
2483 StgBlockingQueueElement *prev = NULL;
2484 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2485 prev = t, t = t->link) {
2486 if (t == (StgBlockingQueueElement *)tso) {
2488 blocked_queue_hd = (StgTSO *)t->link;
2489 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2490 blocked_queue_tl = END_TSO_QUEUE;
2493 prev->link = t->link;
2494 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2495 blocked_queue_tl = (StgTSO *)prev;
2501 barf("unblockThread (I/O): TSO not found");
2505 barf("unblockThread");
2509 tso->link = END_TSO_QUEUE;
2510 tso->why_blocked = NotBlocked;
2511 tso->block_info.closure = NULL;
2512 PUSH_ON_RUN_QUEUE(tso);
2513 RELEASE_LOCK(&sched_mutex);
2517 unblockThread(StgTSO *tso)
2521 ACQUIRE_LOCK(&sched_mutex);
2522 switch (tso->why_blocked) {
2525 return; /* not blocked */
2528 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2530 StgTSO *last_tso = END_TSO_QUEUE;
2531 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2534 for (t = mvar->head; t != END_TSO_QUEUE;
2535 last = &t->link, last_tso = t, t = t->link) {
2538 if (mvar->tail == tso) {
2539 mvar->tail = last_tso;
2544 barf("unblockThread (MVAR): TSO not found");
2547 case BlockedOnBlackHole:
2548 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2550 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2552 last = &bq->blocking_queue;
2553 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2554 last = &t->link, t = t->link) {
2560 barf("unblockThread (BLACKHOLE): TSO not found");
2563 case BlockedOnException:
2565 StgTSO *target = tso->block_info.tso;
2567 ASSERT(get_itbl(target)->type == TSO);
2568 ASSERT(target->blocked_exceptions != NULL);
2570 last = &target->blocked_exceptions;
2571 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2572 last = &t->link, t = t->link) {
2573 ASSERT(get_itbl(t)->type == TSO);
2579 barf("unblockThread (Exception): TSO not found");
2582 case BlockedOnDelay:
2584 case BlockedOnWrite:
2586 StgTSO *prev = NULL;
2587 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2588 prev = t, t = t->link) {
2591 blocked_queue_hd = t->link;
2592 if (blocked_queue_tl == t) {
2593 blocked_queue_tl = END_TSO_QUEUE;
2596 prev->link = t->link;
2597 if (blocked_queue_tl == t) {
2598 blocked_queue_tl = prev;
2604 barf("unblockThread (I/O): TSO not found");
2608 barf("unblockThread");
2612 tso->link = END_TSO_QUEUE;
2613 tso->why_blocked = NotBlocked;
2614 tso->block_info.closure = NULL;
2615 PUSH_ON_RUN_QUEUE(tso);
2616 RELEASE_LOCK(&sched_mutex);
2620 /* -----------------------------------------------------------------------------
2623 * The following function implements the magic for raising an
2624 * asynchronous exception in an existing thread.
2626 * We first remove the thread from any queue on which it might be
2627 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2629 * We strip the stack down to the innermost CATCH_FRAME, building
2630 * thunks in the heap for all the active computations, so they can
2631 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2632 * an application of the handler to the exception, and push it on
2633 * the top of the stack.
2635 * How exactly do we save all the active computations? We create an
2636 * AP_UPD for every UpdateFrame on the stack. Entering one of these
2637 * AP_UPDs pushes everything from the corresponding update frame
2638 * upwards onto the stack. (Actually, it pushes everything up to the
2639 * next update frame plus a pointer to the next AP_UPD object.
2640 * Entering the next AP_UPD object pushes more onto the stack until we
2641 * reach the last AP_UPD object - at which point the stack should look
2642 * exactly as it did when we killed the TSO and we can continue
2643 * execution by entering the closure on top of the stack.
2645 * We can also kill a thread entirely - this happens if either (a) the
2646 * exception passed to raiseAsync is NULL, or (b) there's no
2647 * CATCH_FRAME on the stack. In either case, we strip the entire
2648 * stack and replace the thread with a zombie.
2650 * -------------------------------------------------------------------------- */
2653 deleteThread(StgTSO *tso)
2655 raiseAsync(tso,NULL);
2659 raiseAsync(StgTSO *tso, StgClosure *exception)
2661 StgUpdateFrame* su = tso->su;
2662 StgPtr sp = tso->sp;
2664 /* Thread already dead? */
2665 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2669 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2671 /* Remove it from any blocking queues */
2674 /* The stack freezing code assumes there's a closure pointer on
2675 * the top of the stack. This isn't always the case with compiled
2676 * code, so we have to push a dummy closure on the top which just
2677 * returns to the next return address on the stack.
2679 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2680 *(--sp) = (W_)&dummy_ret_closure;
2684 int words = ((P_)su - (P_)sp) - 1;
2688 /* If we find a CATCH_FRAME, and we've got an exception to raise,
2689 * then build PAP(handler,exception,realworld#), and leave it on
2690 * top of the stack ready to enter.
2692 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2693 StgCatchFrame *cf = (StgCatchFrame *)su;
2694 /* we've got an exception to raise, so let's pass it to the
2695 * handler in this frame.
2697 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2698 TICK_ALLOC_UPD_PAP(3,0);
2699 SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2702 ap->fun = cf->handler; /* :: Exception -> IO a */
2703 ap->payload[0] = (P_)exception;
2704 ap->payload[1] = ARG_TAG(0); /* realworld token */
2706 /* throw away the stack from Sp up to and including the
2709 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
2712 /* Restore the blocked/unblocked state for asynchronous exceptions
2713 * at the CATCH_FRAME.
2715 * If exceptions were unblocked at the catch, arrange that they
2716 * are unblocked again after executing the handler by pushing an
2717 * unblockAsyncExceptions_ret stack frame.
2719 if (!cf->exceptions_blocked) {
2720 *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2723 /* Ensure that async exceptions are blocked when running the handler.
2725 if (tso->blocked_exceptions == NULL) {
2726 tso->blocked_exceptions = END_TSO_QUEUE;
2729 /* Put the newly-built PAP on top of the stack, ready to execute
2730 * when the thread restarts.
2734 tso->what_next = ThreadEnterGHC;
2738 /* First build an AP_UPD consisting of the stack chunk above the
2739 * current update frame, with the top word on the stack as the
2742 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2747 ap->fun = (StgClosure *)sp[0];
2749 for(i=0; i < (nat)words; ++i) {
2750 ap->payload[i] = (P_)*sp++;
2753 switch (get_itbl(su)->type) {
2757 SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */);
2758 TICK_ALLOC_UP_THK(words+1,0);
2761 fprintf(stderr, "scheduler: Updating ");
2762 printPtr((P_)su->updatee);
2763 fprintf(stderr, " with ");
2764 printObj((StgClosure *)ap);
2767 /* Replace the updatee with an indirection - happily
2768 * this will also wake up any threads currently
2769 * waiting on the result.
2771 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
2773 sp += sizeofW(StgUpdateFrame) -1;
2774 sp[0] = (W_)ap; /* push onto stack */
2780 StgCatchFrame *cf = (StgCatchFrame *)su;
2783 /* We want a PAP, not an AP_UPD. Fortunately, the
2784 * layout's the same.
2786 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2787 TICK_ALLOC_UPD_PAP(words+1,0);
2789 /* now build o = FUN(catch,ap,handler) */
2790 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2791 TICK_ALLOC_FUN(2,0);
2792 SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2793 o->payload[0] = (StgClosure *)ap;
2794 o->payload[1] = cf->handler;
2797 fprintf(stderr, "scheduler: Built ");
2798 printObj((StgClosure *)o);
2801 /* pop the old handler and put o on the stack */
2803 sp += sizeofW(StgCatchFrame) - 1;
2810 StgSeqFrame *sf = (StgSeqFrame *)su;
2813 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2814 TICK_ALLOC_UPD_PAP(words+1,0);
2816 /* now build o = FUN(seq,ap) */
2817 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2818 TICK_ALLOC_SE_THK(1,0);
2819 SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2820 o->payload[0] = (StgClosure *)ap;
2823 fprintf(stderr, "scheduler: Built ");
2824 printObj((StgClosure *)o);
2827 /* pop the old handler and put o on the stack */
2829 sp += sizeofW(StgSeqFrame) - 1;
2835 /* We've stripped the entire stack, the thread is now dead. */
2836 sp += sizeofW(StgStopFrame) - 1;
2837 sp[0] = (W_)exception; /* save the exception */
2838 tso->what_next = ThreadKilled;
2839 tso->su = (StgUpdateFrame *)(sp+1);
2850 /* -----------------------------------------------------------------------------
2851 resurrectThreads is called after garbage collection on the list of
2852 threads found to be garbage. Each of these threads will be woken
2853 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2854 on an MVar, or NonTermination if the thread was blocked on a Black
2856 -------------------------------------------------------------------------- */
2859 resurrectThreads( StgTSO *threads )
2863 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2864 next = tso->global_link;
2865 tso->global_link = all_threads;
2867 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2869 switch (tso->why_blocked) {
2871 case BlockedOnException:
2872 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2874 case BlockedOnBlackHole:
2875 raiseAsync(tso,(StgClosure *)NonTermination_closure);
2878 /* This might happen if the thread was blocked on a black hole
2879 * belonging to a thread that we've just woken up (raiseAsync
2880 * can wake up threads, remember...).
2884 barf("resurrectThreads: thread blocked in a strange way");
2889 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2890 //@subsection Debugging Routines
2892 /* -----------------------------------------------------------------------------
2893 Debugging: why is a thread blocked
2894 -------------------------------------------------------------------------- */
2899 printThreadBlockage(StgTSO *tso)
2901 switch (tso->why_blocked) {
2903 fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2905 case BlockedOnWrite:
2906 fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2908 case BlockedOnDelay:
2909 #if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS)
2910 fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2912 fprintf(stderr,"blocked on delay of %d ms",
2913 tso->block_info.target - getourtimeofday());
2917 fprintf(stderr,"blocked on an MVar");
2919 case BlockedOnException:
2920 fprintf(stderr,"blocked on delivering an exception to thread %d",
2921 tso->block_info.tso->id);
2923 case BlockedOnBlackHole:
2924 fprintf(stderr,"blocked on a black hole");
2927 fprintf(stderr,"not blocked");
2931 fprintf(stderr,"blocked on global address; local FM_BQ is %p (%s)",
2932 tso->block_info.closure, info_type(tso->block_info.closure));
2934 case BlockedOnGA_NoSend:
2935 fprintf(stderr,"blocked on global address (no send); local FM_BQ is %p (%s)",
2936 tso->block_info.closure, info_type(tso->block_info.closure));
2940 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
2941 tso->why_blocked, tso->id, tso);
2946 printThreadStatus(StgTSO *tso)
2948 switch (tso->what_next) {
2950 fprintf(stderr,"has been killed");
2952 case ThreadComplete:
2953 fprintf(stderr,"has completed");
2956 printThreadBlockage(tso);
2961 printAllThreads(void)
2965 sched_belch("all threads:");
2966 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2967 fprintf(stderr, "\tthread %d is ", t->id);
2968 printThreadStatus(t);
2969 fprintf(stderr,"\n");
2974 Print a whole blocking queue attached to node (debugging only).
2979 print_bq (StgClosure *node)
2981 StgBlockingQueueElement *bqe;
2985 fprintf(stderr,"## BQ of closure %p (%s): ",
2986 node, info_type(node));
2988 /* should cover all closures that may have a blocking queue */
2989 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2990 get_itbl(node)->type == FETCH_ME_BQ ||
2991 get_itbl(node)->type == RBH);
2993 ASSERT(node!=(StgClosure*)NULL); // sanity check
2995 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2997 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2998 !end; // iterate until bqe points to a CONSTR
2999 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3000 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3001 ASSERT(bqe != (StgTSO*)NULL); // sanity check
3002 /* types of closures that may appear in a blocking queue */
3003 ASSERT(get_itbl(bqe)->type == TSO ||
3004 get_itbl(bqe)->type == BLOCKED_FETCH ||
3005 get_itbl(bqe)->type == CONSTR);
3006 /* only BQs of an RBH end with an RBH_Save closure */
3007 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3009 switch (get_itbl(bqe)->type) {
3011 fprintf(stderr," TSO %d (%x),",
3012 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3015 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3016 ((StgBlockedFetch *)bqe)->node,
3017 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3018 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3019 ((StgBlockedFetch *)bqe)->ga.weight);
3022 fprintf(stderr," %s (IP %p),",
3023 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3024 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3025 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3026 "RBH_Save_?"), get_itbl(bqe));
3029 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3030 info_type(bqe), node, info_type(node));
3034 fputc('\n', stderr);
3036 # elif defined(GRAN)
3038 print_bq (StgClosure *node)
3040 StgBlockingQueueElement *bqe;
3041 PEs node_loc, tso_loc;
3044 /* should cover all closures that may have a blocking queue */
3045 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3046 get_itbl(node)->type == FETCH_ME_BQ ||
3047 get_itbl(node)->type == RBH);
3049 ASSERT(node!=(StgClosure*)NULL); // sanity check
3050 node_loc = where_is(node);
3052 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3053 node, info_type(node), node_loc);
3056 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3058 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3059 !end; // iterate until bqe points to a CONSTR
3060 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3061 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3062 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3063 /* types of closures that may appear in a blocking queue */
3064 ASSERT(get_itbl(bqe)->type == TSO ||
3065 get_itbl(bqe)->type == CONSTR);
3066 /* only BQs of an RBH end with an RBH_Save closure */
3067 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3069 tso_loc = where_is((StgClosure *)bqe);
3070 switch (get_itbl(bqe)->type) {
3072 fprintf(stderr," TSO %d (%p) on [PE %d],",
3073 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3076 fprintf(stderr," %s (IP %p),",
3077 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3078 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3079 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3080 "RBH_Save_?"), get_itbl(bqe));
3083 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3084 info_type((StgClosure *)bqe), node, info_type(node));
3088 fputc('\n', stderr);
3092 Nice and easy: only TSOs on the blocking queue
3095 print_bq (StgClosure *node)
3099 ASSERT(node!=(StgClosure*)NULL); // sanity check
3100 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3101 tso != END_TSO_QUEUE;
3103 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3104 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3105 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3107 fputc('\n', stderr);
3118 for (i=0, tso=run_queue_hd;
3119 tso != END_TSO_QUEUE;
3128 sched_belch(char *s, ...)
3133 fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3135 fprintf(stderr, "scheduler: ");
3137 vfprintf(stderr, s, ap);
3138 fprintf(stderr, "\n");
3144 //@node Index, , Debugging Routines, Main scheduling code
3148 //* MainRegTable:: @cindex\s-+MainRegTable
3149 //* StgMainThread:: @cindex\s-+StgMainThread
3150 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3151 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3152 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3153 //* context_switch:: @cindex\s-+context_switch
3154 //* createThread:: @cindex\s-+createThread
3155 //* free_capabilities:: @cindex\s-+free_capabilities
3156 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3157 //* initScheduler:: @cindex\s-+initScheduler
3158 //* interrupted:: @cindex\s-+interrupted
3159 //* n_free_capabilities:: @cindex\s-+n_free_capabilities
3160 //* next_thread_id:: @cindex\s-+next_thread_id
3161 //* print_bq:: @cindex\s-+print_bq
3162 //* run_queue_hd:: @cindex\s-+run_queue_hd
3163 //* run_queue_tl:: @cindex\s-+run_queue_tl
3164 //* sched_mutex:: @cindex\s-+sched_mutex
3165 //* schedule:: @cindex\s-+schedule
3166 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3167 //* task_ids:: @cindex\s-+task_ids
3168 //* term_mutex:: @cindex\s-+term_mutex
3169 //* thread_ready_cond:: @cindex\s-+thread_ready_cond