1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.66 2000/04/11 16:36:53 sewardj Exp $
4 * (c) The GHC Team, 1998-2000
8 * The main scheduling code in GranSim is quite different from that in std
9 * (concurrent) Haskell: while concurrent Haskell just iterates over the
10 * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11 * over the events in the global event queue. -- HWL
12 * --------------------------------------------------------------------------*/
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
17 /* Version with scheduler monitor support for SMPs.
19 This design provides a high-level API to create and schedule threads etc.
20 as documented in the SMP design document.
22 It uses a monitor design controlled by a single mutex to exercise control
23 over accesses to shared data structures, and builds on the Posix threads
26 The majority of state is shared. In order to keep essential per-task state,
27 there is a Capability structure, which contains all the information
28 needed to run a thread: its STG registers, a pointer to its TSO, a
29 nursery etc. During STG execution, a pointer to the capability is
30 kept in a register (BaseReg).
32 In a non-SMP build, there is one global capability, namely MainRegTable.
39 //* Variables and Data structures::
40 //* Main scheduling loop::
41 //* Suspend and Resume::
43 //* Garbage Collextion Routines::
44 //* Blocking Queue Routines::
45 //* Exception Handling Routines::
46 //* Debugging Routines::
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
59 #include "StgStartup.h"
63 #include "StgMiscClosures.h"
65 #include "Evaluator.h"
66 #include "Exception.h"
74 #if defined(GRAN) || defined(PAR)
75 # include "GranSimRts.h"
77 # include "ParallelRts.h"
78 # include "Parallel.h"
79 # include "ParallelDebug.h"
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
92 * These are the threads which clients have requested that we run.
94 * In an SMP build, we might have several concurrent clients all
95 * waiting for results, and each one will wait on a condition variable
96 * until the result is available.
98 * In non-SMP, clients are strictly nested: the first client calls
99 * into the RTS, which might call out again to C with a _ccall_GC, and
100 * eventually re-enter the RTS.
102 * Main threads information is kept in a linked list:
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
107 SchedulerStatus stat;
110 pthread_cond_t wakeup;
112 struct StgMainThread_ *link;
115 /* Main thread queue.
116 * Locks required: sched_mutex.
118 static StgMainThread *main_threads;
121 * Locks required: sched_mutex.
125 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
126 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
129 In GranSim we have a runable and a blocked queue for each processor.
130 In order to minimise code changes new arrays run_queue_hds/tls
131 are created. run_queue_hd is then a short cut (macro) for
132 run_queue_hds[CurrentProc] (see GranSim.h).
135 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
136 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
137 StgTSO *ccalling_threadss[MAX_PROC];
138 /* We use the same global list of threads (all_threads) in GranSim as in
139 the std RTS (i.e. we are cheating). However, we don't use this list in
140 the GranSim specific code at the moment (so we are only potentially
145 StgTSO *run_queue_hd, *run_queue_tl;
146 StgTSO *blocked_queue_hd, *blocked_queue_tl;
150 /* Linked list of all threads.
151 * Used for detecting garbage collected threads.
155 /* Threads suspended in _ccall_GC.
157 static StgTSO *suspended_ccalling_threads;
159 static void GetRoots(void);
160 static StgTSO *threadStackOverflow(StgTSO *tso);
162 /* KH: The following two flags are shared memory locations. There is no need
163 to lock them, since they are only unset at the end of a scheduler
167 /* flag set by signal handler to precipitate a context switch */
168 //@cindex context_switch
171 /* if this flag is set as well, give up execution */
172 //@cindex interrupted
175 /* Next thread ID to allocate.
176 * Locks required: sched_mutex
178 //@cindex next_thread_id
179 StgThreadID next_thread_id = 1;
182 * Pointers to the state of the current thread.
183 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
184 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
187 /* The smallest stack size that makes any sense is:
188 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
189 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
190 * + 1 (the realworld token for an IO thread)
191 * + 1 (the closure to enter)
193 * A thread with this stack will bomb immediately with a stack
194 * overflow, which will increase its stack size.
197 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
199 /* Free capability list.
200 * Locks required: sched_mutex.
203 //@cindex free_capabilities
204 //@cindex n_free_capabilities
205 Capability *free_capabilities; /* Available capabilities for running threads */
206 nat n_free_capabilities; /* total number of available capabilities */
208 //@cindex MainRegTable
209 Capability MainRegTable; /* for non-SMP, we have one global capability */
218 /* All our current task ids, saved in case we need to kill them later.
225 void addToBlockedQueue ( StgTSO *tso );
227 static void schedule ( void );
228 void interruptStgRts ( void );
230 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
232 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
236 static void sched_belch(char *s, ...);
240 //@cindex sched_mutex
242 //@cindex thread_ready_cond
243 //@cindex gc_pending_cond
244 pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
245 pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
246 pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
247 pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
254 rtsTime TimeOfLastYield;
258 char *whatNext_strs[] = {
266 char *threadReturnCode_strs[] = {
267 "HeapOverflow", /* might also be StackOverflow */
276 * The thread state for the main thread.
277 // ToDo: check whether not needed any more
281 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
282 //@subsection Main scheduling loop
284 /* ---------------------------------------------------------------------------
285 Main scheduling loop.
287 We use round-robin scheduling, each thread returning to the
288 scheduler loop when one of these conditions is detected:
291 * timer expires (thread yields)
296 Locking notes: we acquire the scheduler lock once at the beginning
297 of the scheduler loop, and release it when
299 * running a thread, or
300 * waiting for work, or
301 * waiting for a GC to complete.
304 In a GranSim setup this loop iterates over the global event queue.
305 This revolves around the global event queue, which determines what
306 to do next. Therefore, it's more complicated than either the
307 concurrent or the parallel (GUM) setup.
310 GUM iterates over incoming messages.
311 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312 and sends out a fish whenever it has nothing to do; in-between
313 doing the actual reductions (shared code below) it processes the
314 incoming messages and deals with delayed operations
315 (see PendingFetches).
316 This is not the ugliest code you could imagine, but it's bloody close.
318 ------------------------------------------------------------------------ */
325 StgThreadReturnCode ret;
334 rtsBool was_interrupted = rtsFalse;
336 ACQUIRE_LOCK(&sched_mutex);
340 /* set up first event to get things going */
341 /* ToDo: assign costs for system setup and init MainTSO ! */
342 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
344 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
347 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
348 G_TSO(CurrentTSO, 5));
350 if (RtsFlags.GranFlags.Light) {
351 /* Save current time; GranSim Light only */
352 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
355 event = get_next_event();
357 while (event!=(rtsEvent*)NULL) {
358 /* Choose the processor with the next event */
359 CurrentProc = event->proc;
360 CurrentTSO = event->tso;
364 while (!GlobalStopPending) { /* GlobalStopPending set in par_exit */
372 IF_DEBUG(scheduler, printAllThreads());
374 /* If we're interrupted (the user pressed ^C, or some other
375 * termination condition occurred), kill all the currently running
379 IF_DEBUG(scheduler, sched_belch("interrupted"));
380 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
383 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
386 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
387 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
388 interrupted = rtsFalse;
389 was_interrupted = rtsTrue;
392 /* Go through the list of main threads and wake up any
393 * clients whose computations have finished. ToDo: this
394 * should be done more efficiently without a linear scan
395 * of the main threads list, somehow...
399 StgMainThread *m, **prev;
400 prev = &main_threads;
401 for (m = main_threads; m != NULL; m = m->link) {
402 switch (m->tso->what_next) {
405 *(m->ret) = (StgClosure *)m->tso->sp[0];
409 pthread_cond_broadcast(&m->wakeup);
413 if (was_interrupted) {
414 m->stat = Interrupted;
418 pthread_cond_broadcast(&m->wakeup);
428 /* in GUM do this only on the Main PE */
431 /* If our main thread has finished or been killed, return.
434 StgMainThread *m = main_threads;
435 if (m->tso->what_next == ThreadComplete
436 || m->tso->what_next == ThreadKilled) {
437 main_threads = main_threads->link;
438 if (m->tso->what_next == ThreadComplete) {
439 /* we finished successfully, fill in the return value */
440 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
444 if (was_interrupted) {
445 m->stat = Interrupted;
455 /* Top up the run queue from our spark pool. We try to make the
456 * number of threads in the run queue equal to the number of
461 nat n = n_free_capabilities;
462 StgTSO *tso = run_queue_hd;
464 /* Count the run queue */
465 while (n > 0 && tso != END_TSO_QUEUE) {
474 break; /* no more sparks in the pool */
476 /* I'd prefer this to be done in activateSpark -- HWL */
477 /* tricky - it needs to hold the scheduler lock and
478 * not try to re-acquire it -- SDM */
480 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
481 pushClosure(tso,spark);
482 PUSH_ON_RUN_QUEUE(tso);
484 advisory_thread_count++;
488 sched_belch("turning spark of closure %p into a thread",
489 (StgClosure *)spark));
492 /* We need to wake up the other tasks if we just created some
495 if (n_free_capabilities - n > 1) {
496 pthread_cond_signal(&thread_ready_cond);
501 /* Check whether any waiting threads need to be woken up. If the
502 * run queue is empty, and there are no other tasks running, we
503 * can wait indefinitely for something to happen.
504 * ToDo: what if another client comes along & requests another
507 if (blocked_queue_hd != END_TSO_QUEUE) {
509 (run_queue_hd == END_TSO_QUEUE)
511 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
516 /* check for signals each time around the scheduler */
518 if (signals_pending()) {
519 start_signal_handlers();
523 /* Detect deadlock: when we have no threads to run, there are
524 * no threads waiting on I/O or sleeping, and all the other
525 * tasks are waiting for work, we must have a deadlock. Inform
526 * all the main threads.
529 if (blocked_queue_hd == END_TSO_QUEUE
530 && run_queue_hd == END_TSO_QUEUE
531 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
534 for (m = main_threads; m != NULL; m = m->link) {
537 pthread_cond_broadcast(&m->wakeup);
542 if (blocked_queue_hd == END_TSO_QUEUE
543 && run_queue_hd == END_TSO_QUEUE) {
544 StgMainThread *m = main_threads;
547 main_threads = m->link;
553 /* If there's a GC pending, don't do anything until it has
557 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
558 pthread_cond_wait(&gc_pending_cond, &sched_mutex);
561 /* block until we've got a thread on the run queue and a free
564 while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
565 IF_DEBUG(scheduler, sched_belch("waiting for work"));
566 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
567 IF_DEBUG(scheduler, sched_belch("work now available"));
573 if (RtsFlags.GranFlags.Light)
574 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
576 /* adjust time based on time-stamp */
577 if (event->time > CurrentTime[CurrentProc] &&
578 event->evttype != ContinueThread)
579 CurrentTime[CurrentProc] = event->time;
581 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
582 if (!RtsFlags.GranFlags.Light)
585 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
587 /* main event dispatcher in GranSim */
588 switch (event->evttype) {
589 /* Should just be continuing execution */
591 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
592 /* ToDo: check assertion
593 ASSERT(run_queue_hd != (StgTSO*)NULL &&
594 run_queue_hd != END_TSO_QUEUE);
596 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
597 if (!RtsFlags.GranFlags.DoAsyncFetch &&
598 procStatus[CurrentProc]==Fetching) {
599 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
600 CurrentTSO->id, CurrentTSO, CurrentProc);
603 /* Ignore ContinueThreads for completed threads */
604 if (CurrentTSO->what_next == ThreadComplete) {
605 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
606 CurrentTSO->id, CurrentTSO, CurrentProc);
609 /* Ignore ContinueThreads for threads that are being migrated */
610 if (PROCS(CurrentTSO)==Nowhere) {
611 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
612 CurrentTSO->id, CurrentTSO, CurrentProc);
615 /* The thread should be at the beginning of the run queue */
616 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
617 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
618 CurrentTSO->id, CurrentTSO, CurrentProc);
619 break; // run the thread anyway
622 new_event(proc, proc, CurrentTime[proc],
624 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
626 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
627 break; // now actually run the thread; DaH Qu'vam yImuHbej
630 do_the_fetchnode(event);
631 goto next_thread; /* handle next event in event queue */
634 do_the_globalblock(event);
635 goto next_thread; /* handle next event in event queue */
638 do_the_fetchreply(event);
639 goto next_thread; /* handle next event in event queue */
641 case UnblockThread: /* Move from the blocked queue to the tail of */
642 do_the_unblock(event);
643 goto next_thread; /* handle next event in event queue */
645 case ResumeThread: /* Move from the blocked queue to the tail of */
646 /* the runnable queue ( i.e. Qu' SImqa'lu') */
647 event->tso->gran.blocktime +=
648 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
649 do_the_startthread(event);
650 goto next_thread; /* handle next event in event queue */
653 do_the_startthread(event);
654 goto next_thread; /* handle next event in event queue */
657 do_the_movethread(event);
658 goto next_thread; /* handle next event in event queue */
661 do_the_movespark(event);
662 goto next_thread; /* handle next event in event queue */
665 do_the_findwork(event);
666 goto next_thread; /* handle next event in event queue */
669 barf("Illegal event type %u\n", event->evttype);
672 /* This point was scheduler_loop in the old RTS */
674 IF_DEBUG(gran, belch("GRAN: after main switch"));
676 TimeOfLastEvent = CurrentTime[CurrentProc];
677 TimeOfNextEvent = get_time_of_next_event();
678 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
679 // CurrentTSO = ThreadQueueHd;
681 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
684 if (RtsFlags.GranFlags.Light)
685 GranSimLight_leave_system(event, &ActiveTSO);
687 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
690 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
692 /* in a GranSim setup the TSO stays on the run queue */
694 /* Take a thread from the run queue. */
695 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
698 fprintf(stderr, "GRAN: About to run current thread, which is\n");
701 context_switch = 0; // turned on via GranYield, checking events and time slice
704 DumpGranEvent(GR_SCHEDULE, t));
706 procStatus[CurrentProc] = Busy;
710 if (PendingFetches != END_BF_QUEUE) {
714 /* ToDo: phps merge with spark activation above */
715 /* check whether we have local work and send requests if we have none */
716 if (run_queue_hd == END_TSO_QUEUE) { /* no runnable threads */
717 /* :-[ no local threads => look out for local sparks */
718 /* the spark pool for the current PE */
719 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
720 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
721 pool->hd < pool->tl) {
723 * ToDo: add GC code check that we really have enough heap afterwards!!
725 * If we're here (no runnable threads) and we have pending
726 * sparks, we must have a space problem. Get enough space
727 * to turn one of those pending sparks into a
731 spark = findSpark(); /* get a spark */
732 if (spark != (rtsSpark) NULL) {
733 tso = activateSpark(spark); /* turn the spark into a thread */
734 IF_PAR_DEBUG(schedule,
735 belch("==== schedule: Created TSO %d (%p); %d threads active",
736 tso->id, tso, advisory_thread_count));
738 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
739 belch("==^^ failed to activate spark");
741 } /* otherwise fall through & pick-up new tso */
743 IF_PAR_DEBUG(verbose,
744 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
745 spark_queue_len(pool)));
749 /* =8-[ no local sparks => look for work on other PEs */
752 * We really have absolutely no work. Send out a fish
753 * (there may be some out there already), and wait for
754 * something to arrive. We clearly can't run any threads
755 * until a SCHEDULE or RESUME arrives, and so that's what
756 * we're hoping to see. (Of course, we still have to
757 * respond to other types of messages.)
760 outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
761 // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
762 /* fishing set in sendFish, processFish;
763 avoid flooding system with fishes via delay */
765 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
773 } else if (PacketsWaiting()) { /* Look for incoming messages */
777 /* Now we are sure that we have some work available */
778 ASSERT(run_queue_hd != END_TSO_QUEUE);
779 /* Take a thread from the run queue, if we have work */
780 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
782 /* ToDo: write something to the log-file
783 if (RTSflags.ParFlags.granSimStats && !sameThread)
784 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
788 /* the spark pool for the current PE */
789 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
791 IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)",
792 spark_queue_len(pool),
794 pool->hd, pool->tl, pool->base, pool->lim));
796 IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)",
797 run_queue_len(), CURRENT_PROC,
798 run_queue_hd, run_queue_tl));
803 we are running a different TSO, so write a schedule event to log file
804 NB: If we use fair scheduling we also have to write a deschedule
805 event for LastTSO; with unfair scheduling we know that the
806 previous tso has blocked whenever we switch to another tso, so
807 we don't need it in GUM for now
809 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
810 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
814 #else /* !GRAN && !PAR */
816 /* grab a thread from the run queue
818 ASSERT(run_queue_hd != END_TSO_QUEUE);
820 IF_DEBUG(sanity,checkTSO(t));
827 cap = free_capabilities;
828 free_capabilities = cap->link;
829 n_free_capabilities--;
834 cap->rCurrentTSO = t;
836 /* set the context_switch flag
838 if (run_queue_hd == END_TSO_QUEUE)
843 RELEASE_LOCK(&sched_mutex);
845 IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
846 t->id, t, whatNext_strs[t->what_next]));
848 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
849 /* Run the current thread
851 switch (cap->rCurrentTSO->what_next) {
854 /* Thread already finished, return to scheduler. */
855 ret = ThreadFinished;
858 ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
861 ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
863 case ThreadEnterHugs:
867 IF_DEBUG(scheduler,sched_belch("entering Hugs"));
868 c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
869 cap->rCurrentTSO->sp += 1;
874 barf("Panic: entered a BCO but no bytecode interpreter in this build");
877 barf("schedule: invalid what_next field");
879 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
881 /* Costs for the scheduler are assigned to CCS_SYSTEM */
886 ACQUIRE_LOCK(&sched_mutex);
889 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
890 #elif !defined(GRAN) && !defined(PAR)
891 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
893 t = cap->rCurrentTSO;
896 /* HACK 675: if the last thread didn't yield, make sure to print a
897 SCHEDULE event to the log file when StgRunning the next thread, even
898 if it is the same one as before */
899 LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t;
900 TimeOfLastYield = CURRENT_TIME;
905 /* make all the running tasks block on a condition variable,
906 * maybe set context_switch and wait till they all pile in,
907 * then have them wait on a GC condition variable.
909 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
910 t->id, t, whatNext_strs[t->what_next]));
913 ASSERT(!is_on_queue(t,CurrentProc));
916 ready_to_gc = rtsTrue;
917 context_switch = 1; /* stop other threads ASAP */
918 PUSH_ON_RUN_QUEUE(t);
919 /* actual GC is done at the end of the while loop */
923 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
924 t->id, t, whatNext_strs[t->what_next]));
925 /* just adjust the stack for this thread, then pop it back
931 /* enlarge the stack */
932 StgTSO *new_t = threadStackOverflow(t);
934 /* This TSO has moved, so update any pointers to it from the
935 * main thread stack. It better not be on any other queues...
938 for (m = main_threads; m != NULL; m = m->link) {
944 PUSH_ON_RUN_QUEUE(new_t);
951 DumpGranEvent(GR_DESCHEDULE, t));
952 globalGranStats.tot_yields++;
955 DumpGranEvent(GR_DESCHEDULE, t));
957 /* put the thread back on the run queue. Then, if we're ready to
958 * GC, check whether this is the last task to stop. If so, wake
959 * up the GC thread. getThread will block during a GC until the
963 if (t->what_next == ThreadEnterHugs) {
964 /* ToDo: or maybe a timer expired when we were in Hugs?
965 * or maybe someone hit ctrl-C
967 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
968 t->id, t, whatNext_strs[t->what_next]);
970 belch("--<< thread %ld (%p; %s) stopped, yielding",
971 t->id, t, whatNext_strs[t->what_next]);
976 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
978 ASSERT(t->link == END_TSO_QUEUE);
980 ASSERT(!is_on_queue(t,CurrentProc));
983 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
984 checkThreadQsSanity(rtsTrue));
986 APPEND_TO_RUN_QUEUE(t);
988 /* add a ContinueThread event to actually process the thread */
989 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
991 t, (StgClosure*)NULL, (rtsSpark*)NULL);
993 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1002 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1003 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1004 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1006 // ??? needed; should emit block before
1008 DumpGranEvent(GR_DESCHEDULE, t));
1009 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1012 ASSERT(procStatus[CurrentProc]==Busy ||
1013 ((procStatus[CurrentProc]==Fetching) &&
1014 (t->block_info.closure!=(StgClosure*)NULL)));
1015 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1016 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1017 procStatus[CurrentProc]==Fetching))
1018 procStatus[CurrentProc] = Idle;
1022 DumpGranEvent(GR_DESCHEDULE, t));
1024 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1028 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1029 t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1030 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1033 /* don't need to do anything. Either the thread is blocked on
1034 * I/O, in which case we'll have called addToBlockedQueue
1035 * previously, or it's blocked on an MVar or Blackhole, in which
1036 * case it'll be on the relevant queue already.
1039 fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1040 printThreadBlockage(t);
1041 fprintf(stderr, "\n"));
1043 /* Only for dumping event to log file
1044 ToDo: do I need this in GranSim, too?
1051 case ThreadFinished:
1052 /* Need to check whether this was a main thread, and if so, signal
1053 * the task that started it with the return value. If we have no
1054 * more main threads, we probably need to stop all the tasks until
1057 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1058 t->what_next = ThreadComplete;
1060 endThread(t, CurrentProc); // clean-up the thread
1062 advisory_thread_count--;
1063 if (RtsFlags.ParFlags.ParStats.Full)
1064 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1069 barf("doneThread: invalid thread return code");
1073 cap->link = free_capabilities;
1074 free_capabilities = cap;
1075 n_free_capabilities++;
1079 if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
1084 /* everybody back, start the GC.
1085 * Could do it in this thread, or signal a condition var
1086 * to do it in another thread. Either way, we need to
1087 * broadcast on gc_pending_cond afterward.
1090 IF_DEBUG(scheduler,sched_belch("doing GC"));
1092 GarbageCollect(GetRoots,rtsFalse);
1093 ready_to_gc = rtsFalse;
1095 pthread_cond_broadcast(&gc_pending_cond);
1098 /* add a ContinueThread event to continue execution of current thread */
1099 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1101 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1103 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1110 IF_GRAN_DEBUG(unused,
1111 print_eventq(EventHd));
1113 event = get_next_event();
1117 /* ToDo: wait for next message to arrive rather than busy wait */
1122 t = take_off_run_queue(END_TSO_QUEUE);
1125 } /* end of while(1) */
1128 /* A hack for Hugs concurrency support. Needs sanitisation (?) */
1129 void deleteAllThreads ( void )
1132 IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
1133 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1136 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1139 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1140 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1143 /* startThread and insertThread are now in GranSim.c -- HWL */
1145 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1146 //@subsection Suspend and Resume
1148 /* ---------------------------------------------------------------------------
1149 * Suspending & resuming Haskell threads.
1151 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1152 * its capability before calling the C function. This allows another
1153 * task to pick up the capability and carry on running Haskell
1154 * threads. It also means that if the C call blocks, it won't lock
1157 * The Haskell thread making the C call is put to sleep for the
1158 * duration of the call, on the susepended_ccalling_threads queue. We
1159 * give out a token to the task, which it can use to resume the thread
1160 * on return from the C function.
1161 * ------------------------------------------------------------------------- */
1164 suspendThread( Capability *cap )
1168 ACQUIRE_LOCK(&sched_mutex);
1171 sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
1173 threadPaused(cap->rCurrentTSO);
1174 cap->rCurrentTSO->link = suspended_ccalling_threads;
1175 suspended_ccalling_threads = cap->rCurrentTSO;
1177 /* Use the thread ID as the token; it should be unique */
1178 tok = cap->rCurrentTSO->id;
1181 cap->link = free_capabilities;
1182 free_capabilities = cap;
1183 n_free_capabilities++;
1186 RELEASE_LOCK(&sched_mutex);
1191 resumeThread( StgInt tok )
1193 StgTSO *tso, **prev;
1196 ACQUIRE_LOCK(&sched_mutex);
1198 prev = &suspended_ccalling_threads;
1199 for (tso = suspended_ccalling_threads;
1200 tso != END_TSO_QUEUE;
1201 prev = &tso->link, tso = tso->link) {
1202 if (tso->id == (StgThreadID)tok) {
1207 if (tso == END_TSO_QUEUE) {
1208 barf("resumeThread: thread not found");
1212 while (free_capabilities == NULL) {
1213 IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1214 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1215 IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1217 cap = free_capabilities;
1218 free_capabilities = cap->link;
1219 n_free_capabilities--;
1221 cap = &MainRegTable;
1224 cap->rCurrentTSO = tso;
1226 RELEASE_LOCK(&sched_mutex);
1231 /* ---------------------------------------------------------------------------
1233 * ------------------------------------------------------------------------ */
1234 static void unblockThread(StgTSO *tso);
1236 /* ---------------------------------------------------------------------------
1237 * Comparing Thread ids.
1239 * This is used from STG land in the implementation of the
1240 * instances of Eq/Ord for ThreadIds.
1241 * ------------------------------------------------------------------------ */
1243 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1245 StgThreadID id1 = tso1->id;
1246 StgThreadID id2 = tso2->id;
1248 if (id1 < id2) return (-1);
1249 if (id1 > id2) return 1;
1253 /* ---------------------------------------------------------------------------
1254 Create a new thread.
1256 The new thread starts with the given stack size. Before the
1257 scheduler can run, however, this thread needs to have a closure
1258 (and possibly some arguments) pushed on its stack. See
1259 pushClosure() in Schedule.h.
1261 createGenThread() and createIOThread() (in SchedAPI.h) are
1262 convenient packaged versions of this function.
1264 currently pri (priority) is only used in a GRAN setup -- HWL
1265 ------------------------------------------------------------------------ */
1266 //@cindex createThread
1268 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1270 createThread(nat stack_size, StgInt pri)
1272 return createThread_(stack_size, rtsFalse, pri);
1276 createThread_(nat size, rtsBool have_lock, StgInt pri)
1280 createThread(nat stack_size)
1282 return createThread_(stack_size, rtsFalse);
1286 createThread_(nat size, rtsBool have_lock)
1293 /* First check whether we should create a thread at all */
1295 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1296 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1298 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1299 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1300 return END_TSO_QUEUE;
1306 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1309 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1311 /* catch ridiculously small stack sizes */
1312 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1313 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1316 stack_size = size - TSO_STRUCT_SIZEW;
1318 tso = (StgTSO *)allocate(size);
1319 TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1321 SET_HDR(tso, &TSO_info, CCS_SYSTEM);
1323 SET_GRAN_HDR(tso, ThisPE);
1325 tso->what_next = ThreadEnterGHC;
1327 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1328 * protect the increment operation on next_thread_id.
1329 * In future, we could use an atomic increment instead.
1331 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1332 tso->id = next_thread_id++;
1333 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1335 tso->why_blocked = NotBlocked;
1336 tso->blocked_exceptions = NULL;
1338 tso->splim = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1339 tso->stack_size = stack_size;
1340 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1342 tso->sp = (P_)&(tso->stack) + stack_size;
1345 tso->prof.CCCS = CCS_MAIN;
1348 /* put a stop frame on the stack */
1349 tso->sp -= sizeofW(StgStopFrame);
1350 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1351 tso->su = (StgUpdateFrame*)tso->sp;
1355 tso->link = END_TSO_QUEUE;
1356 /* uses more flexible routine in GranSim */
1357 insertThread(tso, CurrentProc);
1359 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1364 #if defined(GRAN) || defined(PAR)
1365 DumpGranEvent(GR_START,tso);
1368 /* Link the new thread on the global thread list.
1370 tso->global_link = all_threads;
1374 tso->gran.pri = pri;
1376 tso->gran.magic = TSO_MAGIC; // debugging only
1378 tso->gran.sparkname = 0;
1379 tso->gran.startedat = CURRENT_TIME;
1380 tso->gran.exported = 0;
1381 tso->gran.basicblocks = 0;
1382 tso->gran.allocs = 0;
1383 tso->gran.exectime = 0;
1384 tso->gran.fetchtime = 0;
1385 tso->gran.fetchcount = 0;
1386 tso->gran.blocktime = 0;
1387 tso->gran.blockcount = 0;
1388 tso->gran.blockedat = 0;
1389 tso->gran.globalsparks = 0;
1390 tso->gran.localsparks = 0;
1391 if (RtsFlags.GranFlags.Light)
1392 tso->gran.clock = Now; /* local clock */
1394 tso->gran.clock = 0;
1396 IF_DEBUG(gran,printTSO(tso));
1399 tso->par.magic = TSO_MAGIC; // debugging only
1401 tso->par.sparkname = 0;
1402 tso->par.startedat = CURRENT_TIME;
1403 tso->par.exported = 0;
1404 tso->par.basicblocks = 0;
1405 tso->par.allocs = 0;
1406 tso->par.exectime = 0;
1407 tso->par.fetchtime = 0;
1408 tso->par.fetchcount = 0;
1409 tso->par.blocktime = 0;
1410 tso->par.blockcount = 0;
1411 tso->par.blockedat = 0;
1412 tso->par.globalsparks = 0;
1413 tso->par.localsparks = 0;
1417 globalGranStats.tot_threads_created++;
1418 globalGranStats.threads_created_on_PE[CurrentProc]++;
1419 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1420 globalGranStats.tot_sq_probes++;
1425 belch("==__ schedule: Created TSO %d (%p);",
1426 CurrentProc, tso, tso->id));
1428 IF_PAR_DEBUG(verbose,
1429 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1430 tso->id, tso, advisory_thread_count));
1432 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1433 tso->id, tso->stack_size));
1439 Turn a spark into a thread.
1440 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1443 //@cindex activateSpark
1445 activateSpark (rtsSpark spark)
1449 ASSERT(spark != (rtsSpark)NULL);
1450 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1451 if (tso!=END_TSO_QUEUE) {
1452 pushClosure(tso,spark);
1453 PUSH_ON_RUN_QUEUE(tso);
1454 advisory_thread_count++;
1456 if (RtsFlags.ParFlags.ParStats.Full) {
1457 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1458 IF_PAR_DEBUG(verbose,
1459 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1460 (StgClosure *)spark, info_type((StgClosure *)spark)));
1463 barf("activateSpark: Cannot create TSO");
1465 // ToDo: fwd info on local/global spark to thread -- HWL
1466 // tso->gran.exported = spark->exported;
1467 // tso->gran.locked = !spark->global;
1468 // tso->gran.sparkname = spark->name;
1474 /* ---------------------------------------------------------------------------
1477 * scheduleThread puts a thread on the head of the runnable queue.
1478 * This will usually be done immediately after a thread is created.
1479 * The caller of scheduleThread must create the thread using e.g.
1480 * createThread and push an appropriate closure
1481 * on this thread's stack before the scheduler is invoked.
1482 * ------------------------------------------------------------------------ */
1485 scheduleThread(StgTSO *tso)
1487 if (tso==END_TSO_QUEUE){
1492 ACQUIRE_LOCK(&sched_mutex);
1494 /* Put the new thread on the head of the runnable queue. The caller
1495 * better push an appropriate closure on this thread's stack
1496 * beforehand. In the SMP case, the thread may start running as
1497 * soon as we release the scheduler lock below.
1499 PUSH_ON_RUN_QUEUE(tso);
1503 IF_DEBUG(scheduler,printTSO(tso));
1505 RELEASE_LOCK(&sched_mutex);
1508 /* ---------------------------------------------------------------------------
1511 * Start up Posix threads to run each of the scheduler tasks.
1512 * I believe the task ids are not needed in the system as defined.
1514 * ------------------------------------------------------------------------ */
1516 #if defined(PAR) || defined(SMP)
1518 taskStart( void *arg STG_UNUSED )
1520 rts_evalNothing(NULL);
1524 /* ---------------------------------------------------------------------------
1527 * Initialise the scheduler. This resets all the queues - if the
1528 * queues contained any threads, they'll be garbage collected at the
1531 * This now calls startTasks(), so should only be called once! KH @ 25/10/99
1532 * ------------------------------------------------------------------------ */
1536 term_handler(int sig STG_UNUSED)
1539 ACQUIRE_LOCK(&term_mutex);
1541 RELEASE_LOCK(&term_mutex);
1546 //@cindex initScheduler
1553 for (i=0; i<=MAX_PROC; i++) {
1554 run_queue_hds[i] = END_TSO_QUEUE;
1555 run_queue_tls[i] = END_TSO_QUEUE;
1556 blocked_queue_hds[i] = END_TSO_QUEUE;
1557 blocked_queue_tls[i] = END_TSO_QUEUE;
1558 ccalling_threadss[i] = END_TSO_QUEUE;
1561 run_queue_hd = END_TSO_QUEUE;
1562 run_queue_tl = END_TSO_QUEUE;
1563 blocked_queue_hd = END_TSO_QUEUE;
1564 blocked_queue_tl = END_TSO_QUEUE;
1567 suspended_ccalling_threads = END_TSO_QUEUE;
1569 main_threads = NULL;
1570 all_threads = END_TSO_QUEUE;
1575 enteredCAFs = END_CAF_LIST;
1577 /* Install the SIGHUP handler */
1580 struct sigaction action,oact;
1582 action.sa_handler = term_handler;
1583 sigemptyset(&action.sa_mask);
1584 action.sa_flags = 0;
1585 if (sigaction(SIGTERM, &action, &oact) != 0) {
1586 barf("can't install TERM handler");
1592 /* Allocate N Capabilities */
1595 Capability *cap, *prev;
1598 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1599 cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1603 free_capabilities = cap;
1604 n_free_capabilities = RtsFlags.ParFlags.nNodes;
1606 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1607 n_free_capabilities););
1610 #if defined(SMP) || defined(PAR)
1623 /* make some space for saving all the thread ids */
1624 task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1625 "initScheduler:task_ids");
1627 /* and create all the threads */
1628 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1629 r = pthread_create(&tid,NULL,taskStart,NULL);
1631 barf("startTasks: Can't create new Posix thread");
1633 task_ids[i].id = tid;
1634 task_ids[i].mut_time = 0.0;
1635 task_ids[i].mut_etime = 0.0;
1636 task_ids[i].gc_time = 0.0;
1637 task_ids[i].gc_etime = 0.0;
1638 task_ids[i].elapsedtimestart = elapsedtime();
1639 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1645 exitScheduler( void )
1650 /* Don't want to use pthread_cancel, since we'd have to install
1651 * these silly exception handlers (pthread_cleanup_{push,pop}) around
1655 /* Cancel all our tasks */
1656 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1657 pthread_cancel(task_ids[i].id);
1660 /* Wait for all the tasks to terminate */
1661 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1662 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n",
1664 pthread_join(task_ids[i].id, NULL);
1668 /* Send 'em all a SIGHUP. That should shut 'em up.
1670 await_death = RtsFlags.ParFlags.nNodes;
1671 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1672 pthread_kill(task_ids[i].id,SIGTERM);
1674 while (await_death > 0) {
1680 /* -----------------------------------------------------------------------------
1681 Managing the per-task allocation areas.
1683 Each capability comes with an allocation area. These are
1684 fixed-length block lists into which allocation can be done.
1686 ToDo: no support for two-space collection at the moment???
1687 -------------------------------------------------------------------------- */
1689 /* -----------------------------------------------------------------------------
1690 * waitThread is the external interface for running a new computation
1691 * and waiting for the result.
1693 * In the non-SMP case, we create a new main thread, push it on the
1694 * main-thread stack, and invoke the scheduler to run it. The
1695 * scheduler will return when the top main thread on the stack has
1696 * completed or died, and fill in the necessary fields of the
1697 * main_thread structure.
1699 * In the SMP case, we create a main thread as before, but we then
1700 * create a new condition variable and sleep on it. When our new
1701 * main thread has completed, we'll be woken up and the status/result
1702 * will be in the main_thread struct.
1703 * -------------------------------------------------------------------------- */
1706 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1709 SchedulerStatus stat;
1711 ACQUIRE_LOCK(&sched_mutex);
1713 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1719 pthread_cond_init(&m->wakeup, NULL);
1722 m->link = main_threads;
1725 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n",
1730 pthread_cond_wait(&m->wakeup, &sched_mutex);
1731 } while (m->stat == NoStatus);
1733 /* GranSim specific init */
1734 CurrentTSO = m->tso; // the TSO to run
1735 procStatus[MainProc] = Busy; // status of main PE
1736 CurrentProc = MainProc; // PE to run it on
1741 ASSERT(m->stat != NoStatus);
1747 pthread_cond_destroy(&m->wakeup);
1750 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n",
1754 RELEASE_LOCK(&sched_mutex);
1759 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1760 //@subsection Run queue code
1764 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1765 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1766 implicit global variable that has to be correct when calling these
1770 /* Put the new thread on the head of the runnable queue.
1771 * The caller of createThread better push an appropriate closure
1772 * on this thread's stack before the scheduler is invoked.
1774 static /* inline */ void
1775 add_to_run_queue(tso)
1778 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1779 tso->link = run_queue_hd;
1781 if (run_queue_tl == END_TSO_QUEUE) {
1786 /* Put the new thread at the end of the runnable queue. */
1787 static /* inline */ void
1788 push_on_run_queue(tso)
1791 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1792 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1793 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1794 if (run_queue_hd == END_TSO_QUEUE) {
1797 run_queue_tl->link = tso;
1803 Should be inlined because it's used very often in schedule. The tso
1804 argument is actually only needed in GranSim, where we want to have the
1805 possibility to schedule *any* TSO on the run queue, irrespective of the
1806 actual ordering. Therefore, if tso is not the nil TSO then we traverse
1807 the run queue and dequeue the tso, adjusting the links in the queue.
1809 //@cindex take_off_run_queue
1810 static /* inline */ StgTSO*
1811 take_off_run_queue(StgTSO *tso) {
1815 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1817 if tso is specified, unlink that tso from the run_queue (doesn't have
1818 to be at the beginning of the queue); GranSim only
1820 if (tso!=END_TSO_QUEUE) {
1821 /* find tso in queue */
1822 for (t=run_queue_hd, prev=END_TSO_QUEUE;
1823 t!=END_TSO_QUEUE && t!=tso;
1827 /* now actually dequeue the tso */
1828 if (prev!=END_TSO_QUEUE) {
1829 ASSERT(run_queue_hd!=t);
1830 prev->link = t->link;
1832 /* t is at beginning of thread queue */
1833 ASSERT(run_queue_hd==t);
1834 run_queue_hd = t->link;
1836 /* t is at end of thread queue */
1837 if (t->link==END_TSO_QUEUE) {
1838 ASSERT(t==run_queue_tl);
1839 run_queue_tl = prev;
1841 ASSERT(run_queue_tl!=t);
1843 t->link = END_TSO_QUEUE;
1845 /* take tso from the beginning of the queue; std concurrent code */
1847 if (t != END_TSO_QUEUE) {
1848 run_queue_hd = t->link;
1849 t->link = END_TSO_QUEUE;
1850 if (run_queue_hd == END_TSO_QUEUE) {
1851 run_queue_tl = END_TSO_QUEUE;
1860 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1861 //@subsection Garbage Collextion Routines
1863 /* ---------------------------------------------------------------------------
1864 Where are the roots that we know about?
1866 - all the threads on the runnable queue
1867 - all the threads on the blocked queue
1868 - all the thread currently executing a _ccall_GC
1869 - all the "main threads"
1871 ------------------------------------------------------------------------ */
1873 /* This has to be protected either by the scheduler monitor, or by the
1874 garbage collection monitor (probably the latter).
1878 static void GetRoots(void)
1885 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1886 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1887 run_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1888 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1889 run_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1891 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1892 blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1893 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1894 blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1895 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1896 ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1903 if (run_queue_hd != END_TSO_QUEUE) {
1904 ASSERT(run_queue_tl != END_TSO_QUEUE);
1905 run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1906 run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1909 if (blocked_queue_hd != END_TSO_QUEUE) {
1910 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1911 blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1912 blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1916 for (m = main_threads; m != NULL; m = m->link) {
1917 m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1919 if (suspended_ccalling_threads != END_TSO_QUEUE)
1920 suspended_ccalling_threads =
1921 (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1923 #if defined(SMP) || defined(PAR) || defined(GRAN)
1928 /* -----------------------------------------------------------------------------
1931 This is the interface to the garbage collector from Haskell land.
1932 We provide this so that external C code can allocate and garbage
1933 collect when called from Haskell via _ccall_GC.
1935 It might be useful to provide an interface whereby the programmer
1936 can specify more roots (ToDo).
1938 This needs to be protected by the GC condition variable above. KH.
1939 -------------------------------------------------------------------------- */
1941 void (*extra_roots)(void);
1946 GarbageCollect(GetRoots,rtsFalse);
1950 performMajorGC(void)
1952 GarbageCollect(GetRoots,rtsTrue);
1958 GetRoots(); /* the scheduler's roots */
1959 extra_roots(); /* the user's roots */
1963 performGCWithRoots(void (*get_roots)(void))
1965 extra_roots = get_roots;
1967 GarbageCollect(AllRoots,rtsFalse);
1970 /* -----------------------------------------------------------------------------
1973 If the thread has reached its maximum stack size, then raise the
1974 StackOverflow exception in the offending thread. Otherwise
1975 relocate the TSO into a larger chunk of memory and adjust its stack
1977 -------------------------------------------------------------------------- */
1980 threadStackOverflow(StgTSO *tso)
1982 nat new_stack_size, new_tso_size, diff, stack_words;
1986 IF_DEBUG(sanity,checkTSO(tso));
1987 if (tso->stack_size >= tso->max_stack_size) {
1990 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
1991 tso->id, tso, tso->stack_size, tso->max_stack_size);
1992 /* If we're debugging, just print out the top of the stack */
1993 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
1997 fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
2000 /* Send this thread the StackOverflow exception */
2001 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2006 /* Try to double the current stack size. If that takes us over the
2007 * maximum stack size for this thread, then use the maximum instead.
2008 * Finally round up so the TSO ends up as a whole number of blocks.
2010 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2011 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2012 TSO_STRUCT_SIZE)/sizeof(W_);
2013 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2014 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2016 IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2018 dest = (StgTSO *)allocate(new_tso_size);
2019 TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2021 /* copy the TSO block and the old stack into the new area */
2022 memcpy(dest,tso,TSO_STRUCT_SIZE);
2023 stack_words = tso->stack + tso->stack_size - tso->sp;
2024 new_sp = (P_)dest + new_tso_size - stack_words;
2025 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2027 /* relocate the stack pointers... */
2028 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2029 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2031 dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
2032 dest->stack_size = new_stack_size;
2034 /* and relocate the update frame list */
2035 relocate_TSO(tso, dest);
2037 /* Mark the old TSO as relocated. We have to check for relocated
2038 * TSOs in the garbage collector and any primops that deal with TSOs.
2040 * It's important to set the sp and su values to just beyond the end
2041 * of the stack, so we don't attempt to scavenge any part of the
2044 tso->what_next = ThreadRelocated;
2046 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2047 tso->su = (StgUpdateFrame *)tso->sp;
2048 tso->why_blocked = NotBlocked;
2049 dest->mut_link = NULL;
2051 IF_PAR_DEBUG(verbose,
2052 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2053 tso->id, tso, tso->stack_size);
2054 /* If we're debugging, just print out the top of the stack */
2055 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2058 IF_DEBUG(sanity,checkTSO(tso));
2060 IF_DEBUG(scheduler,printTSO(dest));
2066 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2067 //@subsection Blocking Queue Routines
2069 /* ---------------------------------------------------------------------------
2070 Wake up a queue that was blocked on some resource.
2071 ------------------------------------------------------------------------ */
2073 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
2077 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2082 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2084 /* write RESUME events to log file and
2085 update blocked and fetch time (depending on type of the orig closure) */
2086 if (RtsFlags.ParFlags.ParStats.Full) {
2087 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2088 GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2089 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2091 switch (get_itbl(node)->type) {
2093 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2098 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2101 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2108 static StgBlockingQueueElement *
2109 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2112 PEs node_loc, tso_loc;
2114 node_loc = where_is(node); // should be lifted out of loop
2115 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2116 tso_loc = where_is((StgClosure *)tso);
2117 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2118 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2119 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2120 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2121 // insertThread(tso, node_loc);
2122 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2124 tso, node, (rtsSpark*)NULL);
2125 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2128 } else { // TSO is remote (actually should be FMBQ)
2129 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2130 RtsFlags.GranFlags.Costs.gunblocktime +
2131 RtsFlags.GranFlags.Costs.latency;
2132 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2134 tso, node, (rtsSpark*)NULL);
2135 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2138 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2140 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2141 (node_loc==tso_loc ? "Local" : "Global"),
2142 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2143 tso->block_info.closure = NULL;
2144 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2148 static StgBlockingQueueElement *
2149 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2151 StgBlockingQueueElement *next;
2153 switch (get_itbl(bqe)->type) {
2155 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2156 /* if it's a TSO just push it onto the run_queue */
2158 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2159 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2161 unblockCount(bqe, node);
2162 /* reset blocking status after dumping event */
2163 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2167 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2169 bqe->link = PendingFetches;
2170 PendingFetches = bqe;
2174 /* can ignore this case in a non-debugging setup;
2175 see comments on RBHSave closures above */
2177 /* check that the closure is an RBHSave closure */
2178 ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
2179 get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
2180 get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
2184 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2185 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2189 // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2193 #else /* !GRAN && !PAR */
2195 unblockOneLocked(StgTSO *tso)
2199 ASSERT(get_itbl(tso)->type == TSO);
2200 ASSERT(tso->why_blocked != NotBlocked);
2201 tso->why_blocked = NotBlocked;
2203 PUSH_ON_RUN_QUEUE(tso);
2205 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2210 #if defined(GRAN) || defined(PAR)
2211 inline StgBlockingQueueElement *
2212 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2214 ACQUIRE_LOCK(&sched_mutex);
2215 bqe = unblockOneLocked(bqe, node);
2216 RELEASE_LOCK(&sched_mutex);
2221 unblockOne(StgTSO *tso)
2223 ACQUIRE_LOCK(&sched_mutex);
2224 tso = unblockOneLocked(tso);
2225 RELEASE_LOCK(&sched_mutex);
2232 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2234 StgBlockingQueueElement *bqe;
2239 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2240 node, CurrentProc, CurrentTime[CurrentProc],
2241 CurrentTSO->id, CurrentTSO));
2243 node_loc = where_is(node);
2245 ASSERT(get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2246 get_itbl(q)->type == CONSTR); // closure (type constructor)
2247 ASSERT(is_unique(node));
2249 /* FAKE FETCH: magically copy the node to the tso's proc;
2250 no Fetch necessary because in reality the node should not have been
2251 moved to the other PE in the first place
2253 if (CurrentProc!=node_loc) {
2255 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2256 node, node_loc, CurrentProc, CurrentTSO->id,
2257 // CurrentTSO, where_is(CurrentTSO),
2258 node->header.gran.procs));
2259 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2261 belch("## new bitmask of node %p is %#x",
2262 node, node->header.gran.procs));
2263 if (RtsFlags.GranFlags.GranSimStats.Global) {
2264 globalGranStats.tot_fake_fetches++;
2269 // ToDo: check: ASSERT(CurrentProc==node_loc);
2270 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2273 bqe points to the current element in the queue
2274 next points to the next element in the queue
2276 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2277 //tso_loc = where_is(tso);
2279 bqe = unblockOneLocked(bqe, node);
2282 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2283 the closure to make room for the anchor of the BQ */
2284 if (bqe!=END_BQ_QUEUE) {
2285 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2287 ASSERT((info_ptr==&RBH_Save_0_info) ||
2288 (info_ptr==&RBH_Save_1_info) ||
2289 (info_ptr==&RBH_Save_2_info));
2291 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2292 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2293 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2296 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2297 node, info_type(node)));
2300 /* statistics gathering */
2301 if (RtsFlags.GranFlags.GranSimStats.Global) {
2302 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2303 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2304 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2305 globalGranStats.tot_awbq++; // total no. of bqs awakened
2308 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2309 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2313 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2315 StgBlockingQueueElement *bqe, *next;
2317 ACQUIRE_LOCK(&sched_mutex);
2319 IF_PAR_DEBUG(verbose,
2320 belch("## AwBQ for node %p on [%x]: ",
2323 ASSERT(get_itbl(q)->type == TSO ||
2324 get_itbl(q)->type == BLOCKED_FETCH ||
2325 get_itbl(q)->type == CONSTR);
2328 while (get_itbl(bqe)->type==TSO ||
2329 get_itbl(bqe)->type==BLOCKED_FETCH) {
2330 bqe = unblockOneLocked(bqe, node);
2332 RELEASE_LOCK(&sched_mutex);
2335 #else /* !GRAN && !PAR */
2337 awakenBlockedQueue(StgTSO *tso)
2339 ACQUIRE_LOCK(&sched_mutex);
2340 while (tso != END_TSO_QUEUE) {
2341 tso = unblockOneLocked(tso);
2343 RELEASE_LOCK(&sched_mutex);
2347 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2348 //@subsection Exception Handling Routines
2350 /* ---------------------------------------------------------------------------
2352 - usually called inside a signal handler so it mustn't do anything fancy.
2353 ------------------------------------------------------------------------ */
2356 interruptStgRts(void)
2362 /* -----------------------------------------------------------------------------
2365 This is for use when we raise an exception in another thread, which
2367 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2368 -------------------------------------------------------------------------- */
2370 #if defined(GRAN) || defined(PAR)
2372 NB: only the type of the blocking queue is different in GranSim and GUM
2373 the operations on the queue-elements are the same
2374 long live polymorphism!
2377 unblockThread(StgTSO *tso)
2379 StgBlockingQueueElement *t, **last;
2381 ACQUIRE_LOCK(&sched_mutex);
2382 switch (tso->why_blocked) {
2385 return; /* not blocked */
2388 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2390 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2391 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2393 last = (StgBlockingQueueElement **)&mvar->head;
2394 for (t = (StgBlockingQueueElement *)mvar->head;
2396 last = &t->link, last_tso = t, t = t->link) {
2397 if (t == (StgBlockingQueueElement *)tso) {
2398 *last = (StgBlockingQueueElement *)tso->link;
2399 if (mvar->tail == tso) {
2400 mvar->tail = (StgTSO *)last_tso;
2405 barf("unblockThread (MVAR): TSO not found");
2408 case BlockedOnBlackHole:
2409 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2411 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2413 last = &bq->blocking_queue;
2414 for (t = bq->blocking_queue;
2416 last = &t->link, t = t->link) {
2417 if (t == (StgBlockingQueueElement *)tso) {
2418 *last = (StgBlockingQueueElement *)tso->link;
2422 barf("unblockThread (BLACKHOLE): TSO not found");
2425 case BlockedOnException:
2427 StgTSO *target = tso->block_info.tso;
2429 ASSERT(get_itbl(target)->type == TSO);
2430 ASSERT(target->blocked_exceptions != NULL);
2432 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2433 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2435 last = &t->link, t = t->link) {
2436 ASSERT(get_itbl(t)->type == TSO);
2437 if (t == (StgBlockingQueueElement *)tso) {
2438 *last = (StgBlockingQueueElement *)tso->link;
2442 barf("unblockThread (Exception): TSO not found");
2445 case BlockedOnDelay:
2447 case BlockedOnWrite:
2449 StgBlockingQueueElement *prev = NULL;
2450 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2451 prev = t, t = t->link) {
2452 if (t == (StgBlockingQueueElement *)tso) {
2454 blocked_queue_hd = (StgTSO *)t->link;
2455 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2456 blocked_queue_tl = END_TSO_QUEUE;
2459 prev->link = t->link;
2460 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2461 blocked_queue_tl = (StgTSO *)prev;
2467 barf("unblockThread (I/O): TSO not found");
2471 barf("unblockThread");
2475 tso->link = END_TSO_QUEUE;
2476 tso->why_blocked = NotBlocked;
2477 tso->block_info.closure = NULL;
2478 PUSH_ON_RUN_QUEUE(tso);
2479 RELEASE_LOCK(&sched_mutex);
2483 unblockThread(StgTSO *tso)
2487 ACQUIRE_LOCK(&sched_mutex);
2488 switch (tso->why_blocked) {
2491 return; /* not blocked */
2494 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2496 StgTSO *last_tso = END_TSO_QUEUE;
2497 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2500 for (t = mvar->head; t != END_TSO_QUEUE;
2501 last = &t->link, last_tso = t, t = t->link) {
2504 if (mvar->tail == tso) {
2505 mvar->tail = last_tso;
2510 barf("unblockThread (MVAR): TSO not found");
2513 case BlockedOnBlackHole:
2514 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2516 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2518 last = &bq->blocking_queue;
2519 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2520 last = &t->link, t = t->link) {
2526 barf("unblockThread (BLACKHOLE): TSO not found");
2529 case BlockedOnException:
2531 StgTSO *target = tso->block_info.tso;
2533 ASSERT(get_itbl(target)->type == TSO);
2534 ASSERT(target->blocked_exceptions != NULL);
2536 last = &target->blocked_exceptions;
2537 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2538 last = &t->link, t = t->link) {
2539 ASSERT(get_itbl(t)->type == TSO);
2545 barf("unblockThread (Exception): TSO not found");
2548 case BlockedOnDelay:
2550 case BlockedOnWrite:
2552 StgTSO *prev = NULL;
2553 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2554 prev = t, t = t->link) {
2557 blocked_queue_hd = t->link;
2558 if (blocked_queue_tl == t) {
2559 blocked_queue_tl = END_TSO_QUEUE;
2562 prev->link = t->link;
2563 if (blocked_queue_tl == t) {
2564 blocked_queue_tl = prev;
2570 barf("unblockThread (I/O): TSO not found");
2574 barf("unblockThread");
2578 tso->link = END_TSO_QUEUE;
2579 tso->why_blocked = NotBlocked;
2580 tso->block_info.closure = NULL;
2581 PUSH_ON_RUN_QUEUE(tso);
2582 RELEASE_LOCK(&sched_mutex);
2586 /* -----------------------------------------------------------------------------
2589 * The following function implements the magic for raising an
2590 * asynchronous exception in an existing thread.
2592 * We first remove the thread from any queue on which it might be
2593 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2595 * We strip the stack down to the innermost CATCH_FRAME, building
2596 * thunks in the heap for all the active computations, so they can
2597 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2598 * an application of the handler to the exception, and push it on
2599 * the top of the stack.
2601 * How exactly do we save all the active computations? We create an
2602 * AP_UPD for every UpdateFrame on the stack. Entering one of these
2603 * AP_UPDs pushes everything from the corresponding update frame
2604 * upwards onto the stack. (Actually, it pushes everything up to the
2605 * next update frame plus a pointer to the next AP_UPD object.
2606 * Entering the next AP_UPD object pushes more onto the stack until we
2607 * reach the last AP_UPD object - at which point the stack should look
2608 * exactly as it did when we killed the TSO and we can continue
2609 * execution by entering the closure on top of the stack.
2611 * We can also kill a thread entirely - this happens if either (a) the
2612 * exception passed to raiseAsync is NULL, or (b) there's no
2613 * CATCH_FRAME on the stack. In either case, we strip the entire
2614 * stack and replace the thread with a zombie.
2616 * -------------------------------------------------------------------------- */
2619 deleteThread(StgTSO *tso)
2621 raiseAsync(tso,NULL);
2625 raiseAsync(StgTSO *tso, StgClosure *exception)
2627 StgUpdateFrame* su = tso->su;
2628 StgPtr sp = tso->sp;
2630 /* Thread already dead? */
2631 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2635 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2637 /* Remove it from any blocking queues */
2640 /* The stack freezing code assumes there's a closure pointer on
2641 * the top of the stack. This isn't always the case with compiled
2642 * code, so we have to push a dummy closure on the top which just
2643 * returns to the next return address on the stack.
2645 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2646 *(--sp) = (W_)&dummy_ret_closure;
2650 int words = ((P_)su - (P_)sp) - 1;
2654 /* If we find a CATCH_FRAME, and we've got an exception to raise,
2655 * then build PAP(handler,exception,realworld#), and leave it on
2656 * top of the stack ready to enter.
2658 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2659 StgCatchFrame *cf = (StgCatchFrame *)su;
2660 /* we've got an exception to raise, so let's pass it to the
2661 * handler in this frame.
2663 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2664 TICK_ALLOC_UPD_PAP(3,0);
2665 SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2668 ap->fun = cf->handler; /* :: Exception -> IO a */
2669 ap->payload[0] = (P_)exception;
2670 ap->payload[1] = ARG_TAG(0); /* realworld token */
2672 /* throw away the stack from Sp up to and including the
2675 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
2678 /* Restore the blocked/unblocked state for asynchronous exceptions
2679 * at the CATCH_FRAME.
2681 * If exceptions were unblocked at the catch, arrange that they
2682 * are unblocked again after executing the handler by pushing an
2683 * unblockAsyncExceptions_ret stack frame.
2685 if (!cf->exceptions_blocked) {
2686 *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2689 /* Ensure that async exceptions are blocked when running the handler.
2691 if (tso->blocked_exceptions == NULL) {
2692 tso->blocked_exceptions = END_TSO_QUEUE;
2695 /* Put the newly-built PAP on top of the stack, ready to execute
2696 * when the thread restarts.
2700 tso->what_next = ThreadEnterGHC;
2704 /* First build an AP_UPD consisting of the stack chunk above the
2705 * current update frame, with the top word on the stack as the
2708 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2713 ap->fun = (StgClosure *)sp[0];
2715 for(i=0; i < (nat)words; ++i) {
2716 ap->payload[i] = (P_)*sp++;
2719 switch (get_itbl(su)->type) {
2723 SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */);
2724 TICK_ALLOC_UP_THK(words+1,0);
2727 fprintf(stderr, "scheduler: Updating ");
2728 printPtr((P_)su->updatee);
2729 fprintf(stderr, " with ");
2730 printObj((StgClosure *)ap);
2733 /* Replace the updatee with an indirection - happily
2734 * this will also wake up any threads currently
2735 * waiting on the result.
2737 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
2739 sp += sizeofW(StgUpdateFrame) -1;
2740 sp[0] = (W_)ap; /* push onto stack */
2746 StgCatchFrame *cf = (StgCatchFrame *)su;
2749 /* We want a PAP, not an AP_UPD. Fortunately, the
2750 * layout's the same.
2752 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2753 TICK_ALLOC_UPD_PAP(words+1,0);
2755 /* now build o = FUN(catch,ap,handler) */
2756 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2757 TICK_ALLOC_FUN(2,0);
2758 SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2759 o->payload[0] = (StgClosure *)ap;
2760 o->payload[1] = cf->handler;
2763 fprintf(stderr, "scheduler: Built ");
2764 printObj((StgClosure *)o);
2767 /* pop the old handler and put o on the stack */
2769 sp += sizeofW(StgCatchFrame) - 1;
2776 StgSeqFrame *sf = (StgSeqFrame *)su;
2779 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2780 TICK_ALLOC_UPD_PAP(words+1,0);
2782 /* now build o = FUN(seq,ap) */
2783 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2784 TICK_ALLOC_SE_THK(1,0);
2785 SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2786 o->payload[0] = (StgClosure *)ap;
2789 fprintf(stderr, "scheduler: Built ");
2790 printObj((StgClosure *)o);
2793 /* pop the old handler and put o on the stack */
2795 sp += sizeofW(StgSeqFrame) - 1;
2801 /* We've stripped the entire stack, the thread is now dead. */
2802 sp += sizeofW(StgStopFrame) - 1;
2803 sp[0] = (W_)exception; /* save the exception */
2804 tso->what_next = ThreadKilled;
2805 tso->su = (StgUpdateFrame *)(sp+1);
2816 /* -----------------------------------------------------------------------------
2817 resurrectThreads is called after garbage collection on the list of
2818 threads found to be garbage. Each of these threads will be woken
2819 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2820 on an MVar, or NonTermination if the thread was blocked on a Black
2822 -------------------------------------------------------------------------- */
2825 resurrectThreads( StgTSO *threads )
2829 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2830 next = tso->global_link;
2831 tso->global_link = all_threads;
2833 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2835 switch (tso->why_blocked) {
2837 case BlockedOnException:
2838 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2840 case BlockedOnBlackHole:
2841 raiseAsync(tso,(StgClosure *)NonTermination_closure);
2844 /* This might happen if the thread was blocked on a black hole
2845 * belonging to a thread that we've just woken up (raiseAsync
2846 * can wake up threads, remember...).
2850 barf("resurrectThreads: thread blocked in a strange way");
2855 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2856 //@subsection Debugging Routines
2858 /* -----------------------------------------------------------------------------
2859 Debugging: why is a thread blocked
2860 -------------------------------------------------------------------------- */
2865 printThreadBlockage(StgTSO *tso)
2867 switch (tso->why_blocked) {
2869 fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2871 case BlockedOnWrite:
2872 fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2874 case BlockedOnDelay:
2875 #if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS)
2876 fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2878 fprintf(stderr,"blocked on delay of %d ms",
2879 tso->block_info.target - getourtimeofday());
2883 fprintf(stderr,"blocked on an MVar");
2885 case BlockedOnException:
2886 fprintf(stderr,"blocked on delivering an exception to thread %d",
2887 tso->block_info.tso->id);
2889 case BlockedOnBlackHole:
2890 fprintf(stderr,"blocked on a black hole");
2893 fprintf(stderr,"not blocked");
2897 fprintf(stderr,"blocked on global address; local FM_BQ is %p (%s)",
2898 tso->block_info.closure, info_type(tso->block_info.closure));
2900 case BlockedOnGA_NoSend:
2901 fprintf(stderr,"blocked on global address (no send); local FM_BQ is %p (%s)",
2902 tso->block_info.closure, info_type(tso->block_info.closure));
2906 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
2907 tso->why_blocked, tso->id, tso);
2912 printThreadStatus(StgTSO *tso)
2914 switch (tso->what_next) {
2916 fprintf(stderr,"has been killed");
2918 case ThreadComplete:
2919 fprintf(stderr,"has completed");
2922 printThreadBlockage(tso);
2927 printAllThreads(void)
2931 sched_belch("all threads:");
2932 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2933 fprintf(stderr, "\tthread %d is ", t->id);
2934 printThreadStatus(t);
2935 fprintf(stderr,"\n");
2940 Print a whole blocking queue attached to node (debugging only).
2945 print_bq (StgClosure *node)
2947 StgBlockingQueueElement *bqe;
2951 fprintf(stderr,"## BQ of closure %p (%s): ",
2952 node, info_type(node));
2954 /* should cover all closures that may have a blocking queue */
2955 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2956 get_itbl(node)->type == FETCH_ME_BQ ||
2957 get_itbl(node)->type == RBH);
2959 ASSERT(node!=(StgClosure*)NULL); // sanity check
2961 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2963 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2964 !end; // iterate until bqe points to a CONSTR
2965 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2966 ASSERT(bqe != END_BQ_QUEUE); // sanity check
2967 ASSERT(bqe != (StgTSO*)NULL); // sanity check
2968 /* types of closures that may appear in a blocking queue */
2969 ASSERT(get_itbl(bqe)->type == TSO ||
2970 get_itbl(bqe)->type == BLOCKED_FETCH ||
2971 get_itbl(bqe)->type == CONSTR);
2972 /* only BQs of an RBH end with an RBH_Save closure */
2973 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2975 switch (get_itbl(bqe)->type) {
2977 fprintf(stderr," TSO %d (%x),",
2978 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
2981 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
2982 ((StgBlockedFetch *)bqe)->node,
2983 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
2984 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
2985 ((StgBlockedFetch *)bqe)->ga.weight);
2988 fprintf(stderr," %s (IP %p),",
2989 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2990 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2991 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2992 "RBH_Save_?"), get_itbl(bqe));
2995 barf("Unexpected closure type %s in blocking queue of %p (%s)",
2996 info_type(bqe), node, info_type(node));
3000 fputc('\n', stderr);
3002 # elif defined(GRAN)
3004 print_bq (StgClosure *node)
3006 StgBlockingQueueElement *bqe;
3007 PEs node_loc, tso_loc;
3010 /* should cover all closures that may have a blocking queue */
3011 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3012 get_itbl(node)->type == FETCH_ME_BQ ||
3013 get_itbl(node)->type == RBH);
3015 ASSERT(node!=(StgClosure*)NULL); // sanity check
3016 node_loc = where_is(node);
3018 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3019 node, info_type(node), node_loc);
3022 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3024 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3025 !end; // iterate until bqe points to a CONSTR
3026 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3027 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3028 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3029 /* types of closures that may appear in a blocking queue */
3030 ASSERT(get_itbl(bqe)->type == TSO ||
3031 get_itbl(bqe)->type == CONSTR);
3032 /* only BQs of an RBH end with an RBH_Save closure */
3033 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3035 tso_loc = where_is((StgClosure *)bqe);
3036 switch (get_itbl(bqe)->type) {
3038 fprintf(stderr," TSO %d (%p) on [PE %d],",
3039 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3042 fprintf(stderr," %s (IP %p),",
3043 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3044 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3045 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3046 "RBH_Save_?"), get_itbl(bqe));
3049 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3050 info_type((StgClosure *)bqe), node, info_type(node));
3054 fputc('\n', stderr);
3058 Nice and easy: only TSOs on the blocking queue
3061 print_bq (StgClosure *node)
3065 ASSERT(node!=(StgClosure*)NULL); // sanity check
3066 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3067 tso != END_TSO_QUEUE;
3069 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3070 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3071 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3073 fputc('\n', stderr);
3084 for (i=0, tso=run_queue_hd;
3085 tso != END_TSO_QUEUE;
3094 sched_belch(char *s, ...)
3099 fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3101 fprintf(stderr, "scheduler: ");
3103 vfprintf(stderr, s, ap);
3104 fprintf(stderr, "\n");
3110 //@node Index, , Debugging Routines, Main scheduling code
3114 //* MainRegTable:: @cindex\s-+MainRegTable
3115 //* StgMainThread:: @cindex\s-+StgMainThread
3116 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3117 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3118 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3119 //* context_switch:: @cindex\s-+context_switch
3120 //* createThread:: @cindex\s-+createThread
3121 //* free_capabilities:: @cindex\s-+free_capabilities
3122 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3123 //* initScheduler:: @cindex\s-+initScheduler
3124 //* interrupted:: @cindex\s-+interrupted
3125 //* n_free_capabilities:: @cindex\s-+n_free_capabilities
3126 //* next_thread_id:: @cindex\s-+next_thread_id
3127 //* print_bq:: @cindex\s-+print_bq
3128 //* run_queue_hd:: @cindex\s-+run_queue_hd
3129 //* run_queue_tl:: @cindex\s-+run_queue_tl
3130 //* sched_mutex:: @cindex\s-+sched_mutex
3131 //* schedule:: @cindex\s-+schedule
3132 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3133 //* task_ids:: @cindex\s-+task_ids
3134 //* term_mutex:: @cindex\s-+term_mutex
3135 //* thread_ready_cond:: @cindex\s-+thread_ready_cond