1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.59 2000/03/30 16:07:53 simonmar Exp $
4 * (c) The GHC Team, 1998-2000
8 * The main scheduling code in GranSim is quite different from that in std
9 * (concurrent) Haskell: while concurrent Haskell just iterates over the
10 * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11 * over the events in the global event queue. -- HWL
12 * --------------------------------------------------------------------------*/
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
17 /* Version with scheduler monitor support for SMPs.
19 This design provides a high-level API to create and schedule threads etc.
20 as documented in the SMP design document.
22 It uses a monitor design controlled by a single mutex to exercise control
23 over accesses to shared data structures, and builds on the Posix threads
26 The majority of state is shared. In order to keep essential per-task state,
27 there is a Capability structure, which contains all the information
28 needed to run a thread: its STG registers, a pointer to its TSO, a
29 nursery etc. During STG execution, a pointer to the capability is
30 kept in a register (BaseReg).
32 In a non-SMP build, there is one global capability, namely MainRegTable.
39 //* Variables and Data structures::
41 //* Main scheduling loop::
42 //* Suspend and Resume::
44 //* Garbage Collextion Routines::
45 //* Blocking Queue Routines::
46 //* Exception Handling Routines::
47 //* Debugging Routines::
51 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
52 //@subsection Includes
60 #include "StgStartup.h"
64 #include "StgMiscClosures.h"
66 #include "Evaluator.h"
67 #include "Exception.h"
71 #include "Profiling.h"
77 #if defined(GRAN) || defined(PAR)
78 # include "GranSimRts.h"
80 # include "ParallelRts.h"
81 # include "Parallel.h"
82 # include "ParallelDebug.h"
89 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
90 //@subsection Variables and Data structures
94 * These are the threads which clients have requested that we run.
96 * In an SMP build, we might have several concurrent clients all
97 * waiting for results, and each one will wait on a condition variable
98 * until the result is available.
100 * In non-SMP, clients are strictly nested: the first client calls
101 * into the RTS, which might call out again to C with a _ccall_GC, and
102 * eventually re-enter the RTS.
104 * Main threads information is kept in a linked list:
106 //@cindex StgMainThread
107 typedef struct StgMainThread_ {
109 SchedulerStatus stat;
112 pthread_cond_t wakeup;
114 struct StgMainThread_ *link;
117 /* Main thread queue.
118 * Locks required: sched_mutex.
120 static StgMainThread *main_threads;
123 * Locks required: sched_mutex.
127 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
128 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
131 In GranSim we have a runable and a blocked queue for each processor.
132 In order to minimise code changes new arrays run_queue_hds/tls
133 are created. run_queue_hd is then a short cut (macro) for
134 run_queue_hds[CurrentProc] (see GranSim.h).
137 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
138 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
139 StgTSO *ccalling_threadss[MAX_PROC];
140 StgTSO *all_threadss[MAX_PROC];
144 StgTSO *run_queue_hd, *run_queue_tl;
145 StgTSO *blocked_queue_hd, *blocked_queue_tl;
147 /* Linked list of all threads.
148 * Used for detecting garbage collected threads.
152 /* Threads suspended in _ccall_GC.
154 static StgTSO *suspended_ccalling_threads;
156 static void GetRoots(void);
157 static StgTSO *threadStackOverflow(StgTSO *tso);
160 /* KH: The following two flags are shared memory locations. There is no need
161 to lock them, since they are only unset at the end of a scheduler
165 /* flag set by signal handler to precipitate a context switch */
166 //@cindex context_switch
169 /* if this flag is set as well, give up execution */
170 //@cindex interrupted
173 /* Next thread ID to allocate.
174 * Locks required: sched_mutex
176 //@cindex next_thread_id
177 StgThreadID next_thread_id = 1;
180 * Pointers to the state of the current thread.
181 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
182 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
185 /* The smallest stack size that makes any sense is:
186 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
187 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
188 * + 1 (the realworld token for an IO thread)
189 * + 1 (the closure to enter)
191 * A thread with this stack will bomb immediately with a stack
192 * overflow, which will increase its stack size.
195 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
197 /* Free capability list.
198 * Locks required: sched_mutex.
201 //@cindex free_capabilities
202 //@cindex n_free_capabilities
203 Capability *free_capabilities; /* Available capabilities for running threads */
204 nat n_free_capabilities; /* total number of available capabilities */
206 //@cindex MainRegTable
207 Capability MainRegTable; /* for non-SMP, we have one global capability */
211 StgTSO *CurrentTSOs[MAX_PROC];
218 /* All our current task ids, saved in case we need to kill them later.
225 void addToBlockedQueue ( StgTSO *tso );
227 static void schedule ( void );
228 void interruptStgRts ( void );
229 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
232 static void sched_belch(char *s, ...);
236 //@cindex sched_mutex
238 //@cindex thread_ready_cond
239 //@cindex gc_pending_cond
240 pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
241 pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
242 pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
243 pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
250 rtsTime TimeOfLastYield;
254 * The thread state for the main thread.
255 // ToDo: check whether not needed any more
260 //@node Prototypes, Main scheduling loop, Variables and Data structures, Main scheduling code
261 //@subsection Prototypes
263 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
264 //@subsection Main scheduling loop
266 /* ---------------------------------------------------------------------------
267 Main scheduling loop.
269 We use round-robin scheduling, each thread returning to the
270 scheduler loop when one of these conditions is detected:
273 * timer expires (thread yields)
278 Locking notes: we acquire the scheduler lock once at the beginning
279 of the scheduler loop, and release it when
281 * running a thread, or
282 * waiting for work, or
283 * waiting for a GC to complete.
285 ------------------------------------------------------------------------ */
292 StgThreadReturnCode ret;
300 rtsBool was_interrupted = rtsFalse;
302 ACQUIRE_LOCK(&sched_mutex);
305 # error ToDo: implement GranSim scheduler
307 while (!GlobalStopPending) { /* GlobalStopPending set in par_exit */
309 if (PendingFetches != END_BF_QUEUE) {
316 IF_DEBUG(scheduler, printAllThreads());
318 /* If we're interrupted (the user pressed ^C, or some other
319 * termination condition occurred), kill all the currently running
323 IF_DEBUG(scheduler, sched_belch("interrupted"));
324 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
327 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
330 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
331 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
332 interrupted = rtsFalse;
333 was_interrupted = rtsTrue;
336 /* Go through the list of main threads and wake up any
337 * clients whose computations have finished. ToDo: this
338 * should be done more efficiently without a linear scan
339 * of the main threads list, somehow...
343 StgMainThread *m, **prev;
344 prev = &main_threads;
345 for (m = main_threads; m != NULL; m = m->link) {
346 switch (m->tso->what_next) {
349 *(m->ret) = (StgClosure *)m->tso->sp[0];
353 pthread_cond_broadcast(&m->wakeup);
357 if (was_interrupted) {
358 m->stat = Interrupted;
362 pthread_cond_broadcast(&m->wakeup);
370 /* If our main thread has finished or been killed, return.
373 StgMainThread *m = main_threads;
374 if (m->tso->what_next == ThreadComplete
375 || m->tso->what_next == ThreadKilled) {
376 main_threads = main_threads->link;
377 if (m->tso->what_next == ThreadComplete) {
378 /* we finished successfully, fill in the return value */
379 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
383 if (was_interrupted) {
384 m->stat = Interrupted;
394 /* Top up the run queue from our spark pool. We try to make the
395 * number of threads in the run queue equal to the number of
400 nat n = n_free_capabilities;
401 StgTSO *tso = run_queue_hd;
403 /* Count the run queue */
404 while (n > 0 && tso != END_TSO_QUEUE) {
413 break; /* no more sparks in the pool */
415 /* I'd prefer this to be done in activateSpark -- HWL */
416 /* tricky - it needs to hold the scheduler lock and
417 * not try to re-acquire it -- SDM */
419 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
420 pushClosure(tso,spark);
421 PUSH_ON_RUN_QUEUE(tso);
423 advisory_thread_count++;
427 sched_belch("turning spark of closure %p into a thread",
428 (StgClosure *)spark));
431 /* We need to wake up the other tasks if we just created some
434 if (n_free_capabilities - n > 1) {
435 pthread_cond_signal(&thread_ready_cond);
440 /* Check whether any waiting threads need to be woken up. If the
441 * run queue is empty, and there are no other tasks running, we
442 * can wait indefinitely for something to happen.
443 * ToDo: what if another client comes along & requests another
446 if (blocked_queue_hd != END_TSO_QUEUE) {
448 (run_queue_hd == END_TSO_QUEUE)
450 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
455 /* check for signals each time around the scheduler */
457 if (signals_pending()) {
458 start_signal_handlers();
462 /* Detect deadlock: when we have no threads to run, there are
463 * no threads waiting on I/O or sleeping, and all the other
464 * tasks are waiting for work, we must have a deadlock. Inform
465 * all the main threads.
468 if (blocked_queue_hd == END_TSO_QUEUE
469 && run_queue_hd == END_TSO_QUEUE
470 && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
473 for (m = main_threads; m != NULL; m = m->link) {
476 pthread_cond_broadcast(&m->wakeup);
481 if (blocked_queue_hd == END_TSO_QUEUE
482 && run_queue_hd == END_TSO_QUEUE) {
483 StgMainThread *m = main_threads;
486 main_threads = m->link;
492 /* If there's a GC pending, don't do anything until it has
496 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
497 pthread_cond_wait(&gc_pending_cond, &sched_mutex);
500 /* block until we've got a thread on the run queue and a free
503 while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
504 IF_DEBUG(scheduler, sched_belch("waiting for work"));
505 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
506 IF_DEBUG(scheduler, sched_belch("work now available"));
511 # error ToDo: implement GranSim scheduler
513 /* ToDo: phps merge with spark activation above */
514 /* check whether we have local work and send requests if we have none */
515 if (run_queue_hd == END_TSO_QUEUE) { /* no runnable threads */
516 /* :-[ no local threads => look out for local sparks */
517 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
518 (pending_sparks_hd[REQUIRED_POOL] < pending_sparks_tl[REQUIRED_POOL] ||
519 pending_sparks_hd[ADVISORY_POOL] < pending_sparks_tl[ADVISORY_POOL])) {
521 * ToDo: add GC code check that we really have enough heap afterwards!!
523 * If we're here (no runnable threads) and we have pending
524 * sparks, we must have a space problem. Get enough space
525 * to turn one of those pending sparks into a
529 spark = findSpark(); /* get a spark */
530 if (spark != (rtsSpark) NULL) {
531 tso = activateSpark(spark); /* turn the spark into a thread */
532 IF_PAR_DEBUG(verbose,
533 belch("== [%x] schedule: Created TSO %p (%d); %d threads active",
534 mytid, tso, tso->id, advisory_thread_count));
536 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
537 belch("^^ failed to activate spark");
539 } /* otherwise fall through & pick-up new tso */
541 IF_PAR_DEBUG(verbose,
542 belch("^^ no local sparks (spark pool contains only NFs: %d)",
543 spark_queue_len(ADVISORY_POOL)));
547 /* =8-[ no local sparks => look for work on other PEs */
550 * We really have absolutely no work. Send out a fish
551 * (there may be some out there already), and wait for
552 * something to arrive. We clearly can't run any threads
553 * until a SCHEDULE or RESUME arrives, and so that's what
554 * we're hoping to see. (Of course, we still have to
555 * respond to other types of messages.)
558 outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
559 // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
560 /* fishing set in sendFish, processFish;
561 avoid flooding system with fishes via delay */
563 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
571 } else if (PacketsWaiting()) { /* Look for incoming messages */
575 /* Now we are sure that we have some work available */
576 ASSERT(run_queue_hd != END_TSO_QUEUE);
577 /* Take a thread from the run queue, if we have work */
578 t = take_off_run_queue(END_TSO_QUEUE);
580 /* ToDo: write something to the log-file
581 if (RTSflags.ParFlags.granSimStats && !sameThread)
582 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
587 IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; lim=%x)",
588 spark_queue_len(ADVISORY_POOL), CURRENT_PROC,
589 pending_sparks_hd[ADVISORY_POOL],
590 pending_sparks_tl[ADVISORY_POOL],
591 pending_sparks_lim[ADVISORY_POOL]));
593 IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)",
594 run_queue_len(), CURRENT_PROC,
595 run_queue_hd, run_queue_tl));
599 we are running a different TSO, so write a schedule event to log file
600 NB: If we use fair scheduling we also have to write a deschedule
601 event for LastTSO; with unfair scheduling we know that the
602 previous tso has blocked whenever we switch to another tso, so
603 we don't need it in GUM for now
605 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
606 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
610 #else /* !GRAN && !PAR */
612 /* grab a thread from the run queue
615 IF_DEBUG(sanity,checkTSO(t));
622 cap = free_capabilities;
623 free_capabilities = cap->link;
624 n_free_capabilities--;
629 cap->rCurrentTSO = t;
631 /* set the context_switch flag
633 if (run_queue_hd == END_TSO_QUEUE)
638 RELEASE_LOCK(&sched_mutex);
640 IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
642 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
643 /* Run the current thread
645 switch (cap->rCurrentTSO->what_next) {
648 /* Thread already finished, return to scheduler. */
649 ret = ThreadFinished;
652 ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
655 ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
657 case ThreadEnterHugs:
661 IF_DEBUG(scheduler,sched_belch("entering Hugs"));
662 c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
663 cap->rCurrentTSO->sp += 1;
668 barf("Panic: entered a BCO but no bytecode interpreter in this build");
671 barf("schedule: invalid what_next field");
673 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
675 /* Costs for the scheduler are assigned to CCS_SYSTEM */
680 ACQUIRE_LOCK(&sched_mutex);
683 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
685 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
687 t = cap->rCurrentTSO;
691 /* make all the running tasks block on a condition variable,
692 * maybe set context_switch and wait till they all pile in,
693 * then have them wait on a GC condition variable.
695 IF_DEBUG(scheduler,belch("thread %ld stopped: HeapOverflow", t->id));
698 ready_to_gc = rtsTrue;
699 context_switch = 1; /* stop other threads ASAP */
700 PUSH_ON_RUN_QUEUE(t);
704 /* just adjust the stack for this thread, then pop it back
707 IF_DEBUG(scheduler,belch("thread %ld stopped, StackOverflow", t->id));
711 /* enlarge the stack */
712 StgTSO *new_t = threadStackOverflow(t);
714 /* This TSO has moved, so update any pointers to it from the
715 * main thread stack. It better not be on any other queues...
718 for (m = main_threads; m != NULL; m = m->link) {
724 PUSH_ON_RUN_QUEUE(new_t);
731 DumpGranEvent(GR_DESCHEDULE, t));
732 globalGranStats.tot_yields++;
735 DumpGranEvent(GR_DESCHEDULE, t));
737 /* put the thread back on the run queue. Then, if we're ready to
738 * GC, check whether this is the last task to stop. If so, wake
739 * up the GC thread. getThread will block during a GC until the
743 if (t->what_next == ThreadEnterHugs) {
744 /* ToDo: or maybe a timer expired when we were in Hugs?
745 * or maybe someone hit ctrl-C
747 belch("thread %ld stopped to switch to Hugs", t->id);
749 belch("thread %ld stopped, yielding", t->id);
753 APPEND_TO_RUN_QUEUE(t);
758 # error ToDo: implement GranSim scheduler
761 DumpGranEvent(GR_DESCHEDULE, t));
764 /* don't need to do anything. Either the thread is blocked on
765 * I/O, in which case we'll have called addToBlockedQueue
766 * previously, or it's blocked on an MVar or Blackhole, in which
767 * case it'll be on the relevant queue already.
770 fprintf(stderr, "thread %d stopped, ", t->id);
771 printThreadBlockage(t);
772 fprintf(stderr, "\n"));
777 /* Need to check whether this was a main thread, and if so, signal
778 * the task that started it with the return value. If we have no
779 * more main threads, we probably need to stop all the tasks until
782 IF_DEBUG(scheduler,belch("thread %ld finished", t->id));
783 t->what_next = ThreadComplete;
785 // ToDo: endThread(t, CurrentProc); // clean-up the thread
787 advisory_thread_count--;
788 if (RtsFlags.ParFlags.ParStats.Full)
789 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
794 barf("doneThread: invalid thread return code");
798 cap->link = free_capabilities;
799 free_capabilities = cap;
800 n_free_capabilities++;
804 if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
809 /* everybody back, start the GC.
810 * Could do it in this thread, or signal a condition var
811 * to do it in another thread. Either way, we need to
812 * broadcast on gc_pending_cond afterward.
815 IF_DEBUG(scheduler,sched_belch("doing GC"));
817 GarbageCollect(GetRoots);
818 ready_to_gc = rtsFalse;
820 pthread_cond_broadcast(&gc_pending_cond);
825 IF_GRAN_DEBUG(unused,
826 print_eventq(EventHd));
828 event = get_next_event();
832 /* ToDo: wait for next message to arrive rather than busy wait */
837 t = take_off_run_queue(END_TSO_QUEUE);
840 } /* end of while(1) */
843 /* A hack for Hugs concurrency support. Needs sanitisation (?) */
844 void deleteAllThreads ( void )
847 IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
848 for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
851 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
854 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
855 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
858 /* startThread and insertThread are now in GranSim.c -- HWL */
860 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
861 //@subsection Suspend and Resume
863 /* ---------------------------------------------------------------------------
864 * Suspending & resuming Haskell threads.
866 * When making a "safe" call to C (aka _ccall_GC), the task gives back
867 * its capability before calling the C function. This allows another
868 * task to pick up the capability and carry on running Haskell
869 * threads. It also means that if the C call blocks, it won't lock
872 * The Haskell thread making the C call is put to sleep for the
873 * duration of the call, on the susepended_ccalling_threads queue. We
874 * give out a token to the task, which it can use to resume the thread
875 * on return from the C function.
876 * ------------------------------------------------------------------------- */
879 suspendThread( Capability *cap )
883 ACQUIRE_LOCK(&sched_mutex);
886 sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
888 threadPaused(cap->rCurrentTSO);
889 cap->rCurrentTSO->link = suspended_ccalling_threads;
890 suspended_ccalling_threads = cap->rCurrentTSO;
892 /* Use the thread ID as the token; it should be unique */
893 tok = cap->rCurrentTSO->id;
896 cap->link = free_capabilities;
897 free_capabilities = cap;
898 n_free_capabilities++;
901 RELEASE_LOCK(&sched_mutex);
906 resumeThread( StgInt tok )
911 ACQUIRE_LOCK(&sched_mutex);
913 prev = &suspended_ccalling_threads;
914 for (tso = suspended_ccalling_threads;
915 tso != END_TSO_QUEUE;
916 prev = &tso->link, tso = tso->link) {
917 if (tso->id == (StgThreadID)tok) {
922 if (tso == END_TSO_QUEUE) {
923 barf("resumeThread: thread not found");
927 while (free_capabilities == NULL) {
928 IF_DEBUG(scheduler, sched_belch("waiting to resume"));
929 pthread_cond_wait(&thread_ready_cond, &sched_mutex);
930 IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
932 cap = free_capabilities;
933 free_capabilities = cap->link;
934 n_free_capabilities--;
939 cap->rCurrentTSO = tso;
941 RELEASE_LOCK(&sched_mutex);
946 /* ---------------------------------------------------------------------------
948 * ------------------------------------------------------------------------ */
949 static void unblockThread(StgTSO *tso);
951 /* ---------------------------------------------------------------------------
952 * Comparing Thread ids.
954 * This is used from STG land in the implementation of the
955 * instances of Eq/Ord for ThreadIds.
956 * ------------------------------------------------------------------------ */
958 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
960 StgThreadID id1 = tso1->id;
961 StgThreadID id2 = tso2->id;
963 if (id1 < id2) return (-1);
964 if (id1 > id2) return 1;
968 /* ---------------------------------------------------------------------------
971 The new thread starts with the given stack size. Before the
972 scheduler can run, however, this thread needs to have a closure
973 (and possibly some arguments) pushed on its stack. See
974 pushClosure() in Schedule.h.
976 createGenThread() and createIOThread() (in SchedAPI.h) are
977 convenient packaged versions of this function.
978 ------------------------------------------------------------------------ */
979 //@cindex createThread
981 /* currently pri (priority) is only used in a GRAN setup -- HWL */
983 createThread(nat stack_size, StgInt pri)
985 return createThread_(stack_size, rtsFalse, pri);
989 createThread_(nat size, rtsBool have_lock, StgInt pri)
993 createThread(nat stack_size)
995 return createThread_(stack_size, rtsFalse);
999 createThread_(nat size, rtsBool have_lock)
1005 /* First check whether we should create a thread at all */
1007 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1008 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1010 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1011 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1012 return END_TSO_QUEUE;
1018 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1021 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1023 /* catch ridiculously small stack sizes */
1024 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1025 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1028 stack_size = size - TSO_STRUCT_SIZEW;
1030 tso = (StgTSO *)allocate(size);
1031 TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1033 SET_HDR(tso, &TSO_info, CCS_MAIN);
1035 SET_GRAN_HDR(tso, ThisPE);
1037 tso->what_next = ThreadEnterGHC;
1039 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1040 * protect the increment operation on next_thread_id.
1041 * In future, we could use an atomic increment instead.
1043 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1044 tso->id = next_thread_id++;
1045 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1047 tso->why_blocked = NotBlocked;
1048 tso->blocked_exceptions = NULL;
1050 tso->splim = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1051 tso->stack_size = stack_size;
1052 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1054 tso->sp = (P_)&(tso->stack) + stack_size;
1057 tso->prof.CCCS = CCS_MAIN;
1060 /* put a stop frame on the stack */
1061 tso->sp -= sizeofW(StgStopFrame);
1062 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
1063 tso->su = (StgUpdateFrame*)tso->sp;
1065 IF_DEBUG(scheduler,belch("---- Initialised TSO %ld (%p), stack size = %lx words",
1066 tso->id, tso, tso->stack_size));
1070 tso->link = END_TSO_QUEUE;
1071 /* uses more flexible routine in GranSim */
1072 insertThread(tso, CurrentProc);
1074 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1079 /* Link the new thread on the global thread list.
1084 tso->global_link = all_threads;
1089 tso->gran.pri = pri;
1090 tso->gran.magic = TSO_MAGIC; // debugging only
1091 tso->gran.sparkname = 0;
1092 tso->gran.startedat = CURRENT_TIME;
1093 tso->gran.exported = 0;
1094 tso->gran.basicblocks = 0;
1095 tso->gran.allocs = 0;
1096 tso->gran.exectime = 0;
1097 tso->gran.fetchtime = 0;
1098 tso->gran.fetchcount = 0;
1099 tso->gran.blocktime = 0;
1100 tso->gran.blockcount = 0;
1101 tso->gran.blockedat = 0;
1102 tso->gran.globalsparks = 0;
1103 tso->gran.localsparks = 0;
1104 if (RtsFlags.GranFlags.Light)
1105 tso->gran.clock = Now; /* local clock */
1107 tso->gran.clock = 0;
1109 IF_DEBUG(gran,printTSO(tso));
1111 tso->par.sparkname = 0;
1112 tso->par.startedat = CURRENT_TIME;
1113 tso->par.exported = 0;
1114 tso->par.basicblocks = 0;
1115 tso->par.allocs = 0;
1116 tso->par.exectime = 0;
1117 tso->par.fetchtime = 0;
1118 tso->par.fetchcount = 0;
1119 tso->par.blocktime = 0;
1120 tso->par.blockcount = 0;
1121 tso->par.blockedat = 0;
1122 tso->par.globalsparks = 0;
1123 tso->par.localsparks = 0;
1127 globalGranStats.tot_threads_created++;
1128 globalGranStats.threads_created_on_PE[CurrentProc]++;
1129 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1130 globalGranStats.tot_sq_probes++;
1133 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1134 tso->id, tso->stack_size));
1138 /* ---------------------------------------------------------------------------
1141 * scheduleThread puts a thread on the head of the runnable queue.
1142 * This will usually be done immediately after a thread is created.
1143 * The caller of scheduleThread must create the thread using e.g.
1144 * createThread and push an appropriate closure
1145 * on this thread's stack before the scheduler is invoked.
1146 * ------------------------------------------------------------------------ */
1149 scheduleThread(StgTSO *tso)
1151 ACQUIRE_LOCK(&sched_mutex);
1153 /* Put the new thread on the head of the runnable queue. The caller
1154 * better push an appropriate closure on this thread's stack
1155 * beforehand. In the SMP case, the thread may start running as
1156 * soon as we release the scheduler lock below.
1158 PUSH_ON_RUN_QUEUE(tso);
1161 IF_DEBUG(scheduler,printTSO(tso));
1162 RELEASE_LOCK(&sched_mutex);
1165 /* ---------------------------------------------------------------------------
1168 * Start up Posix threads to run each of the scheduler tasks.
1169 * I believe the task ids are not needed in the system as defined.
1171 * ------------------------------------------------------------------------ */
1175 taskStart( void *arg STG_UNUSED )
1182 /* ---------------------------------------------------------------------------
1185 * Initialise the scheduler. This resets all the queues - if the
1186 * queues contained any threads, they'll be garbage collected at the
1189 * This now calls startTasks(), so should only be called once! KH @ 25/10/99
1190 * ------------------------------------------------------------------------ */
1194 term_handler(int sig STG_UNUSED)
1197 ACQUIRE_LOCK(&term_mutex);
1199 RELEASE_LOCK(&term_mutex);
1204 //@cindex initScheduler
1211 for (i=0; i<=MAX_PROC; i++) {
1212 run_queue_hds[i] = END_TSO_QUEUE;
1213 run_queue_tls[i] = END_TSO_QUEUE;
1214 blocked_queue_hds[i] = END_TSO_QUEUE;
1215 blocked_queue_tls[i] = END_TSO_QUEUE;
1216 ccalling_threadss[i] = END_TSO_QUEUE;
1219 run_queue_hd = END_TSO_QUEUE;
1220 run_queue_tl = END_TSO_QUEUE;
1221 blocked_queue_hd = END_TSO_QUEUE;
1222 blocked_queue_tl = END_TSO_QUEUE;
1225 suspended_ccalling_threads = END_TSO_QUEUE;
1227 main_threads = NULL;
1228 all_threads = END_TSO_QUEUE;
1233 enteredCAFs = END_CAF_LIST;
1235 /* Install the SIGHUP handler */
1238 struct sigaction action,oact;
1240 action.sa_handler = term_handler;
1241 sigemptyset(&action.sa_mask);
1242 action.sa_flags = 0;
1243 if (sigaction(SIGTERM, &action, &oact) != 0) {
1244 barf("can't install TERM handler");
1250 /* Allocate N Capabilities */
1253 Capability *cap, *prev;
1256 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1257 cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1261 free_capabilities = cap;
1262 n_free_capabilities = RtsFlags.ParFlags.nNodes;
1264 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1265 n_free_capabilities););
1268 #if defined(SMP) || defined(PAR)
1281 /* make some space for saving all the thread ids */
1282 task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1283 "initScheduler:task_ids");
1285 /* and create all the threads */
1286 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1287 r = pthread_create(&tid,NULL,taskStart,NULL);
1289 barf("startTasks: Can't create new Posix thread");
1291 task_ids[i].id = tid;
1292 task_ids[i].mut_time = 0.0;
1293 task_ids[i].mut_etime = 0.0;
1294 task_ids[i].gc_time = 0.0;
1295 task_ids[i].gc_etime = 0.0;
1296 task_ids[i].elapsedtimestart = elapsedtime();
1297 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1303 exitScheduler( void )
1308 /* Don't want to use pthread_cancel, since we'd have to install
1309 * these silly exception handlers (pthread_cleanup_{push,pop}) around
1313 /* Cancel all our tasks */
1314 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1315 pthread_cancel(task_ids[i].id);
1318 /* Wait for all the tasks to terminate */
1319 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1320 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n",
1322 pthread_join(task_ids[i].id, NULL);
1326 /* Send 'em all a SIGHUP. That should shut 'em up.
1328 await_death = RtsFlags.ParFlags.nNodes;
1329 for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1330 pthread_kill(task_ids[i].id,SIGTERM);
1332 while (await_death > 0) {
1338 /* -----------------------------------------------------------------------------
1339 Managing the per-task allocation areas.
1341 Each capability comes with an allocation area. These are
1342 fixed-length block lists into which allocation can be done.
1344 ToDo: no support for two-space collection at the moment???
1345 -------------------------------------------------------------------------- */
1347 /* -----------------------------------------------------------------------------
1348 * waitThread is the external interface for running a new computataion
1349 * and waiting for the result.
1351 * In the non-SMP case, we create a new main thread, push it on the
1352 * main-thread stack, and invoke the scheduler to run it. The
1353 * scheduler will return when the top main thread on the stack has
1354 * completed or died, and fill in the necessary fields of the
1355 * main_thread structure.
1357 * In the SMP case, we create a main thread as before, but we then
1358 * create a new condition variable and sleep on it. When our new
1359 * main thread has completed, we'll be woken up and the status/result
1360 * will be in the main_thread struct.
1361 * -------------------------------------------------------------------------- */
1364 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1367 SchedulerStatus stat;
1369 ACQUIRE_LOCK(&sched_mutex);
1371 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1377 pthread_cond_init(&m->wakeup, NULL);
1380 m->link = main_threads;
1383 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n",
1388 pthread_cond_wait(&m->wakeup, &sched_mutex);
1389 } while (m->stat == NoStatus);
1392 ASSERT(m->stat != NoStatus);
1398 pthread_cond_destroy(&m->wakeup);
1401 IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n",
1405 RELEASE_LOCK(&sched_mutex);
1410 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1411 //@subsection Run queue code
1415 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1416 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1417 implicit global variable that has to be correct when calling these
1421 /* Put the new thread on the head of the runnable queue.
1422 * The caller of createThread better push an appropriate closure
1423 * on this thread's stack before the scheduler is invoked.
1425 static /* inline */ void
1426 add_to_run_queue(tso)
1429 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1430 tso->link = run_queue_hd;
1432 if (run_queue_tl == END_TSO_QUEUE) {
1437 /* Put the new thread at the end of the runnable queue. */
1438 static /* inline */ void
1439 push_on_run_queue(tso)
1442 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1443 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1444 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1445 if (run_queue_hd == END_TSO_QUEUE) {
1448 run_queue_tl->link = tso;
1454 Should be inlined because it's used very often in schedule. The tso
1455 argument is actually only needed in GranSim, where we want to have the
1456 possibility to schedule *any* TSO on the run queue, irrespective of the
1457 actual ordering. Therefore, if tso is not the nil TSO then we traverse
1458 the run queue and dequeue the tso, adjusting the links in the queue.
1460 //@cindex take_off_run_queue
1461 static /* inline */ StgTSO*
1462 take_off_run_queue(StgTSO *tso) {
1466 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1468 if tso is specified, unlink that tso from the run_queue (doesn't have
1469 to be at the beginning of the queue); GranSim only
1471 if (tso!=END_TSO_QUEUE) {
1472 /* find tso in queue */
1473 for (t=run_queue_hd, prev=END_TSO_QUEUE;
1474 t!=END_TSO_QUEUE && t!=tso;
1478 /* now actually dequeue the tso */
1479 if (prev!=END_TSO_QUEUE) {
1480 ASSERT(run_queue_hd!=t);
1481 prev->link = t->link;
1483 /* t is at beginning of thread queue */
1484 ASSERT(run_queue_hd==t);
1485 run_queue_hd = t->link;
1487 /* t is at end of thread queue */
1488 if (t->link==END_TSO_QUEUE) {
1489 ASSERT(t==run_queue_tl);
1490 run_queue_tl = prev;
1492 ASSERT(run_queue_tl!=t);
1494 t->link = END_TSO_QUEUE;
1496 /* take tso from the beginning of the queue; std concurrent code */
1498 if (t != END_TSO_QUEUE) {
1499 run_queue_hd = t->link;
1500 t->link = END_TSO_QUEUE;
1501 if (run_queue_hd == END_TSO_QUEUE) {
1502 run_queue_tl = END_TSO_QUEUE;
1511 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1512 //@subsection Garbage Collextion Routines
1514 /* ---------------------------------------------------------------------------
1515 Where are the roots that we know about?
1517 - all the threads on the runnable queue
1518 - all the threads on the blocked queue
1519 - all the thread currently executing a _ccall_GC
1520 - all the "main threads"
1522 ------------------------------------------------------------------------ */
1524 /* This has to be protected either by the scheduler monitor, or by the
1525 garbage collection monitor (probably the latter).
1529 static void GetRoots(void)
1536 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1537 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1538 run_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1539 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1540 run_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1542 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1543 blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1544 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1545 blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1546 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1547 ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1554 run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1555 run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1557 blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1558 blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1561 for (m = main_threads; m != NULL; m = m->link) {
1562 m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1564 suspended_ccalling_threads =
1565 (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1567 #if defined(SMP) || defined(PAR) || defined(GRAN)
1572 /* -----------------------------------------------------------------------------
1575 This is the interface to the garbage collector from Haskell land.
1576 We provide this so that external C code can allocate and garbage
1577 collect when called from Haskell via _ccall_GC.
1579 It might be useful to provide an interface whereby the programmer
1580 can specify more roots (ToDo).
1582 This needs to be protected by the GC condition variable above. KH.
1583 -------------------------------------------------------------------------- */
1585 void (*extra_roots)(void);
1590 GarbageCollect(GetRoots);
1596 GetRoots(); /* the scheduler's roots */
1597 extra_roots(); /* the user's roots */
1601 performGCWithRoots(void (*get_roots)(void))
1603 extra_roots = get_roots;
1605 GarbageCollect(AllRoots);
1608 /* -----------------------------------------------------------------------------
1611 If the thread has reached its maximum stack size, then raise the
1612 StackOverflow exception in the offending thread. Otherwise
1613 relocate the TSO into a larger chunk of memory and adjust its stack
1615 -------------------------------------------------------------------------- */
1618 threadStackOverflow(StgTSO *tso)
1620 nat new_stack_size, new_tso_size, diff, stack_words;
1624 IF_DEBUG(sanity,checkTSO(tso));
1625 if (tso->stack_size >= tso->max_stack_size) {
1627 /* If we're debugging, just print out the top of the stack */
1628 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
1632 fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
1635 /* Send this thread the StackOverflow exception */
1636 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
1641 /* Try to double the current stack size. If that takes us over the
1642 * maximum stack size for this thread, then use the maximum instead.
1643 * Finally round up so the TSO ends up as a whole number of blocks.
1645 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
1646 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
1647 TSO_STRUCT_SIZE)/sizeof(W_);
1648 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
1649 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
1651 IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
1653 dest = (StgTSO *)allocate(new_tso_size);
1654 TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
1656 /* copy the TSO block and the old stack into the new area */
1657 memcpy(dest,tso,TSO_STRUCT_SIZE);
1658 stack_words = tso->stack + tso->stack_size - tso->sp;
1659 new_sp = (P_)dest + new_tso_size - stack_words;
1660 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
1662 /* relocate the stack pointers... */
1663 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
1664 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
1666 dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
1667 dest->stack_size = new_stack_size;
1669 /* and relocate the update frame list */
1670 relocate_TSO(tso, dest);
1672 /* Mark the old TSO as relocated. We have to check for relocated
1673 * TSOs in the garbage collector and any primops that deal with TSOs.
1675 * It's important to set the sp and su values to just beyond the end
1676 * of the stack, so we don't attempt to scavenge any part of the
1679 tso->what_next = ThreadRelocated;
1681 tso->sp = (P_)&(tso->stack[tso->stack_size]);
1682 tso->su = (StgUpdateFrame *)tso->sp;
1683 tso->why_blocked = NotBlocked;
1684 dest->mut_link = NULL;
1686 IF_DEBUG(sanity,checkTSO(tso));
1688 IF_DEBUG(scheduler,printTSO(dest));
1694 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
1695 //@subsection Blocking Queue Routines
1697 /* ---------------------------------------------------------------------------
1698 Wake up a queue that was blocked on some resource.
1699 ------------------------------------------------------------------------ */
1701 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
1705 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
1710 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
1712 /* write RESUME events to log file and
1713 update blocked and fetch time (depending on type of the orig closure) */
1714 if (RtsFlags.ParFlags.ParStats.Full) {
1715 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1716 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
1717 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
1719 switch (get_itbl(node)->type) {
1721 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
1726 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
1729 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
1736 static StgBlockingQueueElement *
1737 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
1739 StgBlockingQueueElement *next;
1740 PEs node_loc, tso_loc;
1742 node_loc = where_is(node); // should be lifted out of loop
1743 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
1744 tso_loc = where_is(tso);
1745 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
1746 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
1747 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
1748 bq_processing_time += RtsFlags.GranFlags.Costs.lunblocktime;
1749 // insertThread(tso, node_loc);
1750 new_event(tso_loc, tso_loc,
1751 CurrentTime[CurrentProc]+bq_processing_time,
1753 tso, node, (rtsSpark*)NULL);
1754 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
1757 } else { // TSO is remote (actually should be FMBQ)
1758 bq_processing_time += RtsFlags.GranFlags.Costs.mpacktime;
1759 bq_processing_time += RtsFlags.GranFlags.Costs.gunblocktime;
1760 new_event(tso_loc, CurrentProc,
1761 CurrentTime[CurrentProc]+bq_processing_time+
1762 RtsFlags.GranFlags.Costs.latency,
1764 tso, node, (rtsSpark*)NULL);
1765 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
1766 bq_processing_time += RtsFlags.GranFlags.Costs.mtidytime;
1769 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
1771 fprintf(stderr," %s TSO %d (%p) [PE %d] (blocked_on=%p) (next=%p) ,",
1772 (node_loc==tso_loc ? "Local" : "Global"),
1773 tso->id, tso, CurrentProc, tso->blocked_on, tso->link))
1774 tso->blocked_on = NULL;
1775 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
1779 /* if this is the BQ of an RBH, we have to put back the info ripped out of
1780 the closure to make room for the anchor of the BQ */
1781 if (next!=END_BQ_QUEUE) {
1782 ASSERT(get_itbl(node)->type == RBH && get_itbl(next)->type == CONSTR);
1784 ASSERT((info_ptr==&RBH_Save_0_info) ||
1785 (info_ptr==&RBH_Save_1_info) ||
1786 (info_ptr==&RBH_Save_2_info));
1788 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
1789 ((StgRBH *)node)->blocking_queue = ((StgRBHSave *)next)->payload[0];
1790 ((StgRBH *)node)->mut_link = ((StgRBHSave *)next)->payload[1];
1793 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
1794 node, info_type(node)));
1798 static StgBlockingQueueElement *
1799 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
1801 StgBlockingQueueElement *next;
1803 switch (get_itbl(bqe)->type) {
1805 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
1806 /* if it's a TSO just push it onto the run_queue */
1808 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
1809 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
1811 unblockCount(bqe, node);
1812 /* reset blocking status after dumping event */
1813 ((StgTSO *)bqe)->why_blocked = NotBlocked;
1817 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
1819 bqe->link = PendingFetches;
1820 PendingFetches = bqe;
1824 /* can ignore this case in a non-debugging setup;
1825 see comments on RBHSave closures above */
1827 /* check that the closure is an RBHSave closure */
1828 ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
1829 get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
1830 get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
1834 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
1835 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
1839 // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
1843 #else /* !GRAN && !PAR */
1845 unblockOneLocked(StgTSO *tso)
1849 ASSERT(get_itbl(tso)->type == TSO);
1850 ASSERT(tso->why_blocked != NotBlocked);
1851 tso->why_blocked = NotBlocked;
1853 PUSH_ON_RUN_QUEUE(tso);
1855 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
1860 #if defined(PAR) || defined(GRAN)
1862 unblockOne(StgTSO *tso, StgClosure *node)
1864 ACQUIRE_LOCK(&sched_mutex);
1865 tso = unblockOneLocked(tso, node);
1866 RELEASE_LOCK(&sched_mutex);
1871 unblockOne(StgTSO *tso)
1873 ACQUIRE_LOCK(&sched_mutex);
1874 tso = unblockOneLocked(tso);
1875 RELEASE_LOCK(&sched_mutex);
1882 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
1884 StgBlockingQueueElement *bqe, *next;
1886 PEs node_loc, tso_loc;
1887 rtsTime bq_processing_time = 0;
1888 nat len = 0, len_local = 0;
1891 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
1892 node, CurrentProc, CurrentTime[CurrentProc],
1893 CurrentTSO->id, CurrentTSO));
1895 node_loc = where_is(node);
1897 ASSERT(get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
1898 get_itbl(q)->type == CONSTR); // closure (type constructor)
1899 ASSERT(is_unique(node));
1901 /* FAKE FETCH: magically copy the node to the tso's proc;
1902 no Fetch necessary because in reality the node should not have been
1903 moved to the other PE in the first place
1905 if (CurrentProc!=node_loc) {
1907 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
1908 node, node_loc, CurrentProc, CurrentTSO->id,
1909 // CurrentTSO, where_is(CurrentTSO),
1910 node->header.gran.procs));
1911 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
1913 belch("## new bitmask of node %p is %#x",
1914 node, node->header.gran.procs));
1915 if (RtsFlags.GranFlags.GranSimStats.Global) {
1916 globalGranStats.tot_fake_fetches++;
1921 // ToDo: check: ASSERT(CurrentProc==node_loc);
1922 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
1925 bqe points to the current element in the queue
1926 next points to the next element in the queue
1928 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
1929 //tso_loc = where_is(tso);
1930 bqe = unblockOneLocked(bqe, node);
1933 /* statistics gathering */
1934 /* ToDo: fix counters
1935 if (RtsFlags.GranFlags.GranSimStats.Global) {
1936 globalGranStats.tot_bq_processing_time += bq_processing_time;
1937 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
1938 globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
1939 globalGranStats.tot_awbq++; // total no. of bqs awakened
1942 fprintf(stderr,"## BQ Stats of %p: [%d entries, %d local] %s\n",
1943 node, len, len_local, (next!=END_TSO_QUEUE) ? "RBH" : ""));
1948 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
1950 StgBlockingQueueElement *bqe, *next;
1952 ACQUIRE_LOCK(&sched_mutex);
1954 IF_PAR_DEBUG(verbose,
1955 belch("## AwBQ for node %p on [%x]: ",
1958 ASSERT(get_itbl(q)->type == TSO ||
1959 get_itbl(q)->type == BLOCKED_FETCH ||
1960 get_itbl(q)->type == CONSTR);
1963 while (get_itbl(bqe)->type==TSO ||
1964 get_itbl(bqe)->type==BLOCKED_FETCH) {
1965 bqe = unblockOneLocked(bqe, node);
1967 RELEASE_LOCK(&sched_mutex);
1970 #else /* !GRAN && !PAR */
1972 awakenBlockedQueue(StgTSO *tso)
1974 ACQUIRE_LOCK(&sched_mutex);
1975 while (tso != END_TSO_QUEUE) {
1976 tso = unblockOneLocked(tso);
1978 RELEASE_LOCK(&sched_mutex);
1982 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
1983 //@subsection Exception Handling Routines
1985 /* ---------------------------------------------------------------------------
1987 - usually called inside a signal handler so it mustn't do anything fancy.
1988 ------------------------------------------------------------------------ */
1991 interruptStgRts(void)
1997 /* -----------------------------------------------------------------------------
2000 This is for use when we raise an exception in another thread, which
2002 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2003 -------------------------------------------------------------------------- */
2006 unblockThread(StgTSO *tso)
2010 ACQUIRE_LOCK(&sched_mutex);
2011 switch (tso->why_blocked) {
2014 return; /* not blocked */
2017 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2019 StgTSO *last_tso = END_TSO_QUEUE;
2020 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2023 for (t = mvar->head; t != END_TSO_QUEUE;
2024 last = &t->link, last_tso = t, t = t->link) {
2027 if (mvar->tail == tso) {
2028 mvar->tail = last_tso;
2033 barf("unblockThread (MVAR): TSO not found");
2036 case BlockedOnBlackHole:
2037 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2039 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2041 last = &bq->blocking_queue;
2042 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2043 last = &t->link, t = t->link) {
2049 barf("unblockThread (BLACKHOLE): TSO not found");
2052 case BlockedOnException:
2054 StgTSO *target = tso->block_info.tso;
2056 ASSERT(get_itbl(target)->type == TSO);
2057 ASSERT(target->blocked_exceptions != NULL);
2059 last = &target->blocked_exceptions;
2060 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2061 last = &t->link, t = t->link) {
2062 ASSERT(get_itbl(t)->type == TSO);
2068 barf("unblockThread (Exception): TSO not found");
2071 case BlockedOnDelay:
2073 case BlockedOnWrite:
2075 StgTSO *prev = NULL;
2076 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2077 prev = t, t = t->link) {
2080 blocked_queue_hd = t->link;
2081 if (blocked_queue_tl == t) {
2082 blocked_queue_tl = END_TSO_QUEUE;
2085 prev->link = t->link;
2086 if (blocked_queue_tl == t) {
2087 blocked_queue_tl = prev;
2093 barf("unblockThread (I/O): TSO not found");
2097 barf("unblockThread");
2101 tso->link = END_TSO_QUEUE;
2102 tso->why_blocked = NotBlocked;
2103 tso->block_info.closure = NULL;
2104 PUSH_ON_RUN_QUEUE(tso);
2105 RELEASE_LOCK(&sched_mutex);
2108 /* -----------------------------------------------------------------------------
2111 * The following function implements the magic for raising an
2112 * asynchronous exception in an existing thread.
2114 * We first remove the thread from any queue on which it might be
2115 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2117 * We strip the stack down to the innermost CATCH_FRAME, building
2118 * thunks in the heap for all the active computations, so they can
2119 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2120 * an application of the handler to the exception, and push it on
2121 * the top of the stack.
2123 * How exactly do we save all the active computations? We create an
2124 * AP_UPD for every UpdateFrame on the stack. Entering one of these
2125 * AP_UPDs pushes everything from the corresponding update frame
2126 * upwards onto the stack. (Actually, it pushes everything up to the
2127 * next update frame plus a pointer to the next AP_UPD object.
2128 * Entering the next AP_UPD object pushes more onto the stack until we
2129 * reach the last AP_UPD object - at which point the stack should look
2130 * exactly as it did when we killed the TSO and we can continue
2131 * execution by entering the closure on top of the stack.
2133 * We can also kill a thread entirely - this happens if either (a) the
2134 * exception passed to raiseAsync is NULL, or (b) there's no
2135 * CATCH_FRAME on the stack. In either case, we strip the entire
2136 * stack and replace the thread with a zombie.
2138 * -------------------------------------------------------------------------- */
2141 deleteThread(StgTSO *tso)
2143 raiseAsync(tso,NULL);
2147 raiseAsync(StgTSO *tso, StgClosure *exception)
2149 StgUpdateFrame* su = tso->su;
2150 StgPtr sp = tso->sp;
2152 /* Thread already dead? */
2153 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2157 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2159 /* Remove it from any blocking queues */
2162 /* The stack freezing code assumes there's a closure pointer on
2163 * the top of the stack. This isn't always the case with compiled
2164 * code, so we have to push a dummy closure on the top which just
2165 * returns to the next return address on the stack.
2167 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2168 *(--sp) = (W_)&dummy_ret_closure;
2172 int words = ((P_)su - (P_)sp) - 1;
2176 /* If we find a CATCH_FRAME, and we've got an exception to raise,
2177 * then build PAP(handler,exception,realworld#), and leave it on
2178 * top of the stack ready to enter.
2180 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2181 StgCatchFrame *cf = (StgCatchFrame *)su;
2182 /* we've got an exception to raise, so let's pass it to the
2183 * handler in this frame.
2185 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2186 TICK_ALLOC_UPD_PAP(3,0);
2187 SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2190 ap->fun = cf->handler; /* :: Exception -> IO a */
2191 ap->payload[0] = (P_)exception;
2192 ap->payload[1] = ARG_TAG(0); /* realworld token */
2194 /* throw away the stack from Sp up to and including the
2197 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
2200 /* Restore the blocked/unblocked state for asynchronous exceptions
2201 * at the CATCH_FRAME.
2203 * If exceptions were unblocked at the catch, arrange that they
2204 * are unblocked again after executing the handler by pushing an
2205 * unblockAsyncExceptions_ret stack frame.
2207 if (!cf->exceptions_blocked) {
2208 *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2211 /* Ensure that async exceptions are blocked when running the handler.
2213 if (tso->blocked_exceptions == NULL) {
2214 tso->blocked_exceptions = END_TSO_QUEUE;
2217 /* Put the newly-built PAP on top of the stack, ready to execute
2218 * when the thread restarts.
2222 tso->what_next = ThreadEnterGHC;
2226 /* First build an AP_UPD consisting of the stack chunk above the
2227 * current update frame, with the top word on the stack as the
2230 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2235 ap->fun = (StgClosure *)sp[0];
2237 for(i=0; i < (nat)words; ++i) {
2238 ap->payload[i] = (P_)*sp++;
2241 switch (get_itbl(su)->type) {
2245 SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */);
2246 TICK_ALLOC_UP_THK(words+1,0);
2249 fprintf(stderr, "scheduler: Updating ");
2250 printPtr((P_)su->updatee);
2251 fprintf(stderr, " with ");
2252 printObj((StgClosure *)ap);
2255 /* Replace the updatee with an indirection - happily
2256 * this will also wake up any threads currently
2257 * waiting on the result.
2259 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
2261 sp += sizeofW(StgUpdateFrame) -1;
2262 sp[0] = (W_)ap; /* push onto stack */
2268 StgCatchFrame *cf = (StgCatchFrame *)su;
2271 /* We want a PAP, not an AP_UPD. Fortunately, the
2272 * layout's the same.
2274 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2275 TICK_ALLOC_UPD_PAP(words+1,0);
2277 /* now build o = FUN(catch,ap,handler) */
2278 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2279 TICK_ALLOC_FUN(2,0);
2280 SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2281 o->payload[0] = (StgClosure *)ap;
2282 o->payload[1] = cf->handler;
2285 fprintf(stderr, "scheduler: Built ");
2286 printObj((StgClosure *)o);
2289 /* pop the old handler and put o on the stack */
2291 sp += sizeofW(StgCatchFrame) - 1;
2298 StgSeqFrame *sf = (StgSeqFrame *)su;
2301 SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2302 TICK_ALLOC_UPD_PAP(words+1,0);
2304 /* now build o = FUN(seq,ap) */
2305 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2306 TICK_ALLOC_SE_THK(1,0);
2307 SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2308 o->payload[0] = (StgClosure *)ap;
2311 fprintf(stderr, "scheduler: Built ");
2312 printObj((StgClosure *)o);
2315 /* pop the old handler and put o on the stack */
2317 sp += sizeofW(StgSeqFrame) - 1;
2323 /* We've stripped the entire stack, the thread is now dead. */
2324 sp += sizeofW(StgStopFrame) - 1;
2325 sp[0] = (W_)exception; /* save the exception */
2326 tso->what_next = ThreadKilled;
2327 tso->su = (StgUpdateFrame *)(sp+1);
2338 /* -----------------------------------------------------------------------------
2339 resurrectThreads is called after garbage collection on the list of
2340 threads found to be garbage. Each of these threads will be woken
2341 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2342 on an MVar, or NonTermination if the thread was blocked on a Black
2344 -------------------------------------------------------------------------- */
2347 resurrectThreads( StgTSO *threads )
2351 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2352 next = tso->global_link;
2353 tso->global_link = all_threads;
2355 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2357 switch (tso->why_blocked) {
2359 case BlockedOnException:
2360 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2362 case BlockedOnBlackHole:
2363 raiseAsync(tso,(StgClosure *)NonTermination_closure);
2366 /* This might happen if the thread was blocked on a black hole
2367 * belonging to a thread that we've just woken up (raiseAsync
2368 * can wake up threads, remember...).
2372 barf("resurrectThreads: thread blocked in a strange way");
2377 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2378 //@subsection Debugging Routines
2380 /* -----------------------------------------------------------------------------
2381 Debugging: why is a thread blocked
2382 -------------------------------------------------------------------------- */
2387 printThreadBlockage(StgTSO *tso)
2389 switch (tso->why_blocked) {
2391 fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2393 case BlockedOnWrite:
2394 fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2396 case BlockedOnDelay:
2397 #if defined(HAVE_SETITIMER)
2398 fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2400 fprintf(stderr,"blocked on delay of %d ms",
2401 tso->block_info.target - getourtimeofday());
2405 fprintf(stderr,"blocked on an MVar");
2407 case BlockedOnException:
2408 fprintf(stderr,"blocked on delivering an exception to thread %d",
2409 tso->block_info.tso->id);
2411 case BlockedOnBlackHole:
2412 fprintf(stderr,"blocked on a black hole");
2415 fprintf(stderr,"not blocked");
2419 fprintf(stderr,"blocked on global address");
2426 printThreadStatus(StgTSO *tso)
2428 switch (tso->what_next) {
2430 fprintf(stderr,"has been killed");
2432 case ThreadComplete:
2433 fprintf(stderr,"has completed");
2436 printThreadBlockage(tso);
2441 printAllThreads(void)
2445 sched_belch("all threads:");
2446 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2447 fprintf(stderr, "\tthread %d is ", t->id);
2448 printThreadStatus(t);
2449 fprintf(stderr,"\n");
2454 Print a whole blocking queue attached to node (debugging only).
2459 print_bq (StgClosure *node)
2461 StgBlockingQueueElement *bqe;
2465 fprintf(stderr,"## BQ of closure %p (%s): ",
2466 node, info_type(node));
2468 /* should cover all closures that may have a blocking queue */
2469 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2470 get_itbl(node)->type == FETCH_ME_BQ ||
2471 get_itbl(node)->type == RBH);
2473 ASSERT(node!=(StgClosure*)NULL); // sanity check
2475 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2477 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2478 !end; // iterate until bqe points to a CONSTR
2479 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2480 ASSERT(bqe != END_BQ_QUEUE); // sanity check
2481 ASSERT(bqe != (StgTSO*)NULL); // sanity check
2482 /* types of closures that may appear in a blocking queue */
2483 ASSERT(get_itbl(bqe)->type == TSO ||
2484 get_itbl(bqe)->type == BLOCKED_FETCH ||
2485 get_itbl(bqe)->type == CONSTR);
2486 /* only BQs of an RBH end with an RBH_Save closure */
2487 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2489 switch (get_itbl(bqe)->type) {
2491 fprintf(stderr," TSO %d (%x),",
2492 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
2495 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
2496 ((StgBlockedFetch *)bqe)->node,
2497 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
2498 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
2499 ((StgBlockedFetch *)bqe)->ga.weight);
2502 fprintf(stderr," %s (IP %p),",
2503 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2504 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2505 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2506 "RBH_Save_?"), get_itbl(bqe));
2509 barf("Unexpected closure type %s in blocking queue of %p (%s)",
2510 info_type(bqe), node, info_type(node));
2514 fputc('\n', stderr);
2516 # elif defined(GRAN)
2518 print_bq (StgClosure *node)
2520 StgBlockingQueueElement *bqe;
2522 PEs node_loc, tso_loc;
2525 /* should cover all closures that may have a blocking queue */
2526 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2527 get_itbl(node)->type == FETCH_ME_BQ ||
2528 get_itbl(node)->type == RBH);
2530 ASSERT(node!=(StgClosure*)NULL); // sanity check
2531 node_loc = where_is(node);
2533 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
2534 node, info_type(node), node_loc);
2537 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2539 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2540 !end; // iterate until bqe points to a CONSTR
2541 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2542 ASSERT(bqe != END_BQ_QUEUE); // sanity check
2543 ASSERT(bqe != (StgTSO*)NULL); // sanity check
2544 /* types of closures that may appear in a blocking queue */
2545 ASSERT(get_itbl(bqe)->type == TSO ||
2546 get_itbl(bqe)->type == CONSTR);
2547 /* only BQs of an RBH end with an RBH_Save closure */
2548 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2550 tso_loc = where_is((StgClosure *)bqe);
2551 switch (get_itbl(bqe)->type) {
2553 fprintf(stderr," TSO %d (%x) on [PE %d],",
2554 ((StgTSO *)bqe)->id, ((StgTSO *)bqe), tso_loc);
2557 fprintf(stderr," %s (IP %p),",
2558 (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2559 get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2560 get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2561 "RBH_Save_?"), get_itbl(bqe));
2564 barf("Unexpected closure type %s in blocking queue of %p (%s)",
2565 info_type(bqe), node, info_type(node));
2569 fputc('\n', stderr);
2573 Nice and easy: only TSOs on the blocking queue
2576 print_bq (StgClosure *node)
2580 ASSERT(node!=(StgClosure*)NULL); // sanity check
2581 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
2582 tso != END_TSO_QUEUE;
2584 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
2585 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
2586 fprintf(stderr," TSO %d (%p),", tso->id, tso);
2588 fputc('\n', stderr);
2599 for (i=0, tso=run_queue_hd;
2600 tso != END_TSO_QUEUE;
2609 sched_belch(char *s, ...)
2614 fprintf(stderr, "scheduler (task %ld): ", pthread_self());
2616 fprintf(stderr, "scheduler: ");
2618 vfprintf(stderr, s, ap);
2619 fprintf(stderr, "\n");
2625 //@node Index, , Debugging Routines, Main scheduling code
2629 //* MainRegTable:: @cindex\s-+MainRegTable
2630 //* StgMainThread:: @cindex\s-+StgMainThread
2631 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
2632 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
2633 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
2634 //* context_switch:: @cindex\s-+context_switch
2635 //* createThread:: @cindex\s-+createThread
2636 //* free_capabilities:: @cindex\s-+free_capabilities
2637 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
2638 //* initScheduler:: @cindex\s-+initScheduler
2639 //* interrupted:: @cindex\s-+interrupted
2640 //* n_free_capabilities:: @cindex\s-+n_free_capabilities
2641 //* next_thread_id:: @cindex\s-+next_thread_id
2642 //* print_bq:: @cindex\s-+print_bq
2643 //* run_queue_hd:: @cindex\s-+run_queue_hd
2644 //* run_queue_tl:: @cindex\s-+run_queue_tl
2645 //* sched_mutex:: @cindex\s-+sched_mutex
2646 //* schedule:: @cindex\s-+schedule
2647 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
2648 //* task_ids:: @cindex\s-+task_ids
2649 //* term_mutex:: @cindex\s-+term_mutex
2650 //* thread_ready_cond:: @cindex\s-+thread_ready_cond