1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.183 2003/12/16 13:27:32 simonmar Exp $
4 * (c) The GHC Team, 1998-2003
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distrib. memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
18 * --------------------------------------------------------------------------*/
21 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
23 The main scheduling loop in GUM iterates until a finish message is received.
24 In that case a global flag @receivedFinish@ is set and this instance of
25 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
26 for the handling of incoming messages, such as PP_FINISH.
27 Note that in the parallel case we have a system manager that coordinates
28 different PEs, each of which are running one instance of the RTS.
29 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
30 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
32 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
34 The main scheduling code in GranSim is quite different from that in std
35 (concurrent) Haskell: while concurrent Haskell just iterates over the
36 threads in the runnable queue, GranSim is event driven, i.e. it iterates
37 over the events in the global event queue. -- HWL
40 #include "PosixSource.h"
47 #include "StgStartup.h"
49 #define COMPILING_SCHEDULER
51 #include "StgMiscClosures.h"
53 #include "Interpreter.h"
54 #include "Exception.h"
61 #include "ThreadLabels.h"
63 #include "Proftimer.h"
66 #if defined(GRAN) || defined(PAR)
67 # include "GranSimRts.h"
69 # include "ParallelRts.h"
70 # include "Parallel.h"
71 # include "ParallelDebug.h"
76 #include "Capability.h"
77 #include "OSThreads.h"
80 #ifdef HAVE_SYS_TYPES_H
81 #include <sys/types.h>
96 #define USED_IN_THREADED_RTS
98 #define USED_IN_THREADED_RTS STG_UNUSED
101 #ifdef RTS_SUPPORTS_THREADS
102 #define USED_WHEN_RTS_SUPPORTS_THREADS
104 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
107 /* Main thread queue.
108 * Locks required: sched_mutex.
110 StgMainThread *main_threads = NULL;
113 * Locks required: sched_mutex.
117 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
118 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
121 In GranSim we have a runnable and a blocked queue for each processor.
122 In order to minimise code changes new arrays run_queue_hds/tls
123 are created. run_queue_hd is then a short cut (macro) for
124 run_queue_hds[CurrentProc] (see GranSim.h).
127 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
128 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
129 StgTSO *ccalling_threadss[MAX_PROC];
130 /* We use the same global list of threads (all_threads) in GranSim as in
131 the std RTS (i.e. we are cheating). However, we don't use this list in
132 the GranSim specific code at the moment (so we are only potentially
137 StgTSO *run_queue_hd = NULL;
138 StgTSO *run_queue_tl = NULL;
139 StgTSO *blocked_queue_hd = NULL;
140 StgTSO *blocked_queue_tl = NULL;
141 StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */
145 /* Linked list of all threads.
146 * Used for detecting garbage collected threads.
148 StgTSO *all_threads = NULL;
150 /* When a thread performs a safe C call (_ccall_GC, using old
151 * terminology), it gets put on the suspended_ccalling_threads
152 * list. Used by the garbage collector.
154 static StgTSO *suspended_ccalling_threads;
156 static StgTSO *threadStackOverflow(StgTSO *tso);
158 /* KH: The following two flags are shared memory locations. There is no need
159 to lock them, since they are only unset at the end of a scheduler
163 /* flag set by signal handler to precipitate a context switch */
164 nat context_switch = 0;
166 /* if this flag is set as well, give up execution */
167 rtsBool interrupted = rtsFalse;
169 /* Next thread ID to allocate.
170 * Locks required: thread_id_mutex
172 static StgThreadID next_thread_id = 1;
175 * Pointers to the state of the current thread.
176 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
177 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
180 /* The smallest stack size that makes any sense is:
181 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
182 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
183 * + 1 (the closure to enter)
185 * + 1 (spare slot req'd by stg_ap_v_ret)
187 * A thread with this stack will bomb immediately with a stack
188 * overflow, which will increase its stack size.
191 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
198 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
199 * exists - earlier gccs apparently didn't.
204 static rtsBool ready_to_gc;
207 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
208 * in an MT setting, needed to signal that a worker thread shouldn't hang around
209 * in the scheduler when it is out of work.
211 static rtsBool shutting_down_scheduler = rtsFalse;
213 void addToBlockedQueue ( StgTSO *tso );
215 static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
216 void interruptStgRts ( void );
218 static void detectBlackHoles ( void );
220 #if defined(RTS_SUPPORTS_THREADS)
221 /* ToDo: carefully document the invariants that go together
222 * with these synchronisation objects.
224 Mutex sched_mutex = INIT_MUTEX_VAR;
225 Mutex term_mutex = INIT_MUTEX_VAR;
228 * A heavyweight solution to the problem of protecting
229 * the thread_id from concurrent update.
231 Mutex thread_id_mutex = INIT_MUTEX_VAR;
233 #endif /* RTS_SUPPORTS_THREADS */
237 rtsTime TimeOfLastYield;
238 rtsBool emitSchedule = rtsTrue;
242 static char *whatNext_strs[] = {
252 StgTSO * createSparkThread(rtsSpark spark);
253 StgTSO * activateSpark (rtsSpark spark);
256 /* ----------------------------------------------------------------------------
258 * ------------------------------------------------------------------------- */
260 #if defined(RTS_SUPPORTS_THREADS)
261 static rtsBool startingWorkerThread = rtsFalse;
263 static void taskStart(void);
267 ACQUIRE_LOCK(&sched_mutex);
269 RELEASE_LOCK(&sched_mutex);
273 startSchedulerTaskIfNecessary(void)
275 if(run_queue_hd != END_TSO_QUEUE
276 || blocked_queue_hd != END_TSO_QUEUE
277 || sleeping_queue != END_TSO_QUEUE)
279 if(!startingWorkerThread)
280 { // we don't want to start another worker thread
281 // just because the last one hasn't yet reached the
282 // "waiting for capability" state
283 startingWorkerThread = rtsTrue;
284 startTask(taskStart);
290 /* ---------------------------------------------------------------------------
291 Main scheduling loop.
293 We use round-robin scheduling, each thread returning to the
294 scheduler loop when one of these conditions is detected:
297 * timer expires (thread yields)
302 Locking notes: we acquire the scheduler lock once at the beginning
303 of the scheduler loop, and release it when
305 * running a thread, or
306 * waiting for work, or
307 * waiting for a GC to complete.
310 In a GranSim setup this loop iterates over the global event queue.
311 This revolves around the global event queue, which determines what
312 to do next. Therefore, it's more complicated than either the
313 concurrent or the parallel (GUM) setup.
316 GUM iterates over incoming messages.
317 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
318 and sends out a fish whenever it has nothing to do; in-between
319 doing the actual reductions (shared code below) it processes the
320 incoming messages and deals with delayed operations
321 (see PendingFetches).
322 This is not the ugliest code you could imagine, but it's bloody close.
324 ------------------------------------------------------------------------ */
326 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
327 Capability *initialCapability )
330 Capability *cap = initialCapability;
331 StgThreadReturnCode ret;
339 rtsBool receivedFinish = rtsFalse;
341 nat tp_size, sp_size; // stats only
344 rtsBool was_interrupted = rtsFalse;
345 StgTSOWhatNext prev_what_next;
347 // Pre-condition: sched_mutex is held.
349 #if defined(RTS_SUPPORTS_THREADS)
351 // in the threaded case, the capability is either passed in via the
352 // initialCapability parameter, or initialized inside the scheduler
356 sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
357 mainThread, initialCapability);
360 // simply initialise it in the non-threaded case
361 grabCapability(&cap);
365 /* set up first event to get things going */
366 /* ToDo: assign costs for system setup and init MainTSO ! */
367 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
369 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
372 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
373 G_TSO(CurrentTSO, 5));
375 if (RtsFlags.GranFlags.Light) {
376 /* Save current time; GranSim Light only */
377 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
380 event = get_next_event();
382 while (event!=(rtsEvent*)NULL) {
383 /* Choose the processor with the next event */
384 CurrentProc = event->proc;
385 CurrentTSO = event->tso;
389 while (!receivedFinish) { /* set by processMessages */
390 /* when receiving PP_FINISH message */
392 #else // everything except GRAN and PAR
398 IF_DEBUG(scheduler, printAllThreads());
400 #if defined(RTS_SUPPORTS_THREADS)
401 // Yield the capability to higher-priority tasks if necessary.
404 yieldCapability(&cap);
407 // If we do not currently hold a capability, we wait for one
410 waitForCapability(&sched_mutex, &cap,
411 mainThread ? &mainThread->bound_thread_cond : NULL);
414 // We now have a capability...
418 // If we're interrupted (the user pressed ^C, or some other
419 // termination condition occurred), kill all the currently running
423 IF_DEBUG(scheduler, sched_belch("interrupted"));
424 interrupted = rtsFalse;
425 was_interrupted = rtsTrue;
426 #if defined(RTS_SUPPORTS_THREADS)
427 // In the threaded RTS, deadlock detection doesn't work,
428 // so just exit right away.
429 prog_belch("interrupted");
430 releaseCapability(cap);
431 RELEASE_LOCK(&sched_mutex);
432 shutdownHaskellAndExit(EXIT_SUCCESS);
439 // Go through the list of main threads and wake up any
440 // clients whose computations have finished. ToDo: this
441 // should be done more efficiently without a linear scan
442 // of the main threads list, somehow...
444 #if defined(RTS_SUPPORTS_THREADS)
446 StgMainThread *m, **prev;
447 prev = &main_threads;
448 for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
449 if (m->tso->what_next == ThreadComplete
450 || m->tso->what_next == ThreadKilled)
454 if (m->tso->what_next == ThreadComplete)
458 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
459 *(m->ret) = (StgClosure *)m->tso->sp[1];
471 m->stat = Interrupted;
481 removeThreadLabel((StgWord)m->tso->id);
483 releaseCapability(cap);
488 // The current OS thread can not handle the fact that
489 // the Haskell thread "m" has ended. "m" is bound;
490 // the scheduler loop in it's bound OS thread has to
491 // return, so let's pass our capability directly to
493 passCapability(&m->bound_thread_cond);
499 #else /* not threaded */
502 /* in GUM do this only on the Main PE */
505 /* If our main thread has finished or been killed, return.
508 StgMainThread *m = main_threads;
509 if (m->tso->what_next == ThreadComplete
510 || m->tso->what_next == ThreadKilled) {
512 removeThreadLabel((StgWord)m->tso->id);
514 main_threads = main_threads->link;
515 if (m->tso->what_next == ThreadComplete) {
516 // We finished successfully, fill in the return value
517 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
518 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
522 if (m->ret) { *(m->ret) = NULL; };
523 if (was_interrupted) {
524 m->stat = Interrupted;
535 #if defined(RTS_USER_SIGNALS)
536 // check for signals each time around the scheduler
537 if (signals_pending()) {
538 RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
539 startSignalHandlers();
540 ACQUIRE_LOCK(&sched_mutex);
544 /* Check whether any waiting threads need to be woken up. If the
545 * run queue is empty, and there are no other tasks running, we
546 * can wait indefinitely for something to happen.
548 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
549 #if defined(RTS_SUPPORTS_THREADS)
554 awaitEvent( EMPTY_RUN_QUEUE() );
556 /* we can be interrupted while waiting for I/O... */
557 if (interrupted) continue;
560 * Detect deadlock: when we have no threads to run, there are no
561 * threads waiting on I/O or sleeping, and all the other tasks are
562 * waiting for work, we must have a deadlock of some description.
564 * We first try to find threads blocked on themselves (ie. black
565 * holes), and generate NonTermination exceptions where necessary.
567 * If no threads are black holed, we have a deadlock situation, so
568 * inform all the main threads.
570 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
571 if ( EMPTY_THREAD_QUEUES() )
573 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
574 // Garbage collection can release some new threads due to
575 // either (a) finalizers or (b) threads resurrected because
576 // they are about to be send BlockedOnDeadMVar. Any threads
577 // thus released will be immediately runnable.
578 GarbageCollect(GetRoots,rtsTrue);
580 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
583 sched_belch("still deadlocked, checking for black holes..."));
586 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
588 #if defined(RTS_USER_SIGNALS)
589 /* If we have user-installed signal handlers, then wait
590 * for signals to arrive rather then bombing out with a
593 if ( anyUserHandlers() ) {
595 sched_belch("still deadlocked, waiting for signals..."));
599 // we might be interrupted...
600 if (interrupted) { continue; }
602 if (signals_pending()) {
603 RELEASE_LOCK(&sched_mutex);
604 startSignalHandlers();
605 ACQUIRE_LOCK(&sched_mutex);
607 ASSERT(!EMPTY_RUN_QUEUE());
612 /* Probably a real deadlock. Send the current main thread the
613 * Deadlock exception (or in the SMP build, send *all* main
614 * threads the deadlock exception, since none of them can make
620 switch (m->tso->why_blocked) {
621 case BlockedOnBlackHole:
622 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
624 case BlockedOnException:
626 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
629 barf("deadlock: main thread blocked in a strange way");
635 #elif defined(RTS_SUPPORTS_THREADS)
636 // ToDo: add deadlock detection in threaded RTS
638 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
641 #if defined(RTS_SUPPORTS_THREADS)
642 if ( EMPTY_RUN_QUEUE() ) {
643 continue; // nothing to do
648 if (RtsFlags.GranFlags.Light)
649 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
651 /* adjust time based on time-stamp */
652 if (event->time > CurrentTime[CurrentProc] &&
653 event->evttype != ContinueThread)
654 CurrentTime[CurrentProc] = event->time;
656 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
657 if (!RtsFlags.GranFlags.Light)
660 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
662 /* main event dispatcher in GranSim */
663 switch (event->evttype) {
664 /* Should just be continuing execution */
666 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
667 /* ToDo: check assertion
668 ASSERT(run_queue_hd != (StgTSO*)NULL &&
669 run_queue_hd != END_TSO_QUEUE);
671 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
672 if (!RtsFlags.GranFlags.DoAsyncFetch &&
673 procStatus[CurrentProc]==Fetching) {
674 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
675 CurrentTSO->id, CurrentTSO, CurrentProc);
678 /* Ignore ContinueThreads for completed threads */
679 if (CurrentTSO->what_next == ThreadComplete) {
680 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
681 CurrentTSO->id, CurrentTSO, CurrentProc);
684 /* Ignore ContinueThreads for threads that are being migrated */
685 if (PROCS(CurrentTSO)==Nowhere) {
686 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
687 CurrentTSO->id, CurrentTSO, CurrentProc);
690 /* The thread should be at the beginning of the run queue */
691 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
692 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
693 CurrentTSO->id, CurrentTSO, CurrentProc);
694 break; // run the thread anyway
697 new_event(proc, proc, CurrentTime[proc],
699 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
701 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
702 break; // now actually run the thread; DaH Qu'vam yImuHbej
705 do_the_fetchnode(event);
706 goto next_thread; /* handle next event in event queue */
709 do_the_globalblock(event);
710 goto next_thread; /* handle next event in event queue */
713 do_the_fetchreply(event);
714 goto next_thread; /* handle next event in event queue */
716 case UnblockThread: /* Move from the blocked queue to the tail of */
717 do_the_unblock(event);
718 goto next_thread; /* handle next event in event queue */
720 case ResumeThread: /* Move from the blocked queue to the tail of */
721 /* the runnable queue ( i.e. Qu' SImqa'lu') */
722 event->tso->gran.blocktime +=
723 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
724 do_the_startthread(event);
725 goto next_thread; /* handle next event in event queue */
728 do_the_startthread(event);
729 goto next_thread; /* handle next event in event queue */
732 do_the_movethread(event);
733 goto next_thread; /* handle next event in event queue */
736 do_the_movespark(event);
737 goto next_thread; /* handle next event in event queue */
740 do_the_findwork(event);
741 goto next_thread; /* handle next event in event queue */
744 barf("Illegal event type %u\n", event->evttype);
747 /* This point was scheduler_loop in the old RTS */
749 IF_DEBUG(gran, belch("GRAN: after main switch"));
751 TimeOfLastEvent = CurrentTime[CurrentProc];
752 TimeOfNextEvent = get_time_of_next_event();
753 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
754 // CurrentTSO = ThreadQueueHd;
756 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
759 if (RtsFlags.GranFlags.Light)
760 GranSimLight_leave_system(event, &ActiveTSO);
762 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
765 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
767 /* in a GranSim setup the TSO stays on the run queue */
769 /* Take a thread from the run queue. */
770 POP_RUN_QUEUE(t); // take_off_run_queue(t);
773 fprintf(stderr, "GRAN: About to run current thread, which is\n");
776 context_switch = 0; // turned on via GranYield, checking events and time slice
779 DumpGranEvent(GR_SCHEDULE, t));
781 procStatus[CurrentProc] = Busy;
784 if (PendingFetches != END_BF_QUEUE) {
788 /* ToDo: phps merge with spark activation above */
789 /* check whether we have local work and send requests if we have none */
790 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
791 /* :-[ no local threads => look out for local sparks */
792 /* the spark pool for the current PE */
793 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
794 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
795 pool->hd < pool->tl) {
797 * ToDo: add GC code check that we really have enough heap afterwards!!
799 * If we're here (no runnable threads) and we have pending
800 * sparks, we must have a space problem. Get enough space
801 * to turn one of those pending sparks into a
805 spark = findSpark(rtsFalse); /* get a spark */
806 if (spark != (rtsSpark) NULL) {
807 tso = activateSpark(spark); /* turn the spark into a thread */
808 IF_PAR_DEBUG(schedule,
809 belch("==== schedule: Created TSO %d (%p); %d threads active",
810 tso->id, tso, advisory_thread_count));
812 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
813 belch("==^^ failed to activate spark");
815 } /* otherwise fall through & pick-up new tso */
817 IF_PAR_DEBUG(verbose,
818 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
819 spark_queue_len(pool)));
824 /* If we still have no work we need to send a FISH to get a spark
827 if (EMPTY_RUN_QUEUE()) {
828 /* =8-[ no local sparks => look for work on other PEs */
830 * We really have absolutely no work. Send out a fish
831 * (there may be some out there already), and wait for
832 * something to arrive. We clearly can't run any threads
833 * until a SCHEDULE or RESUME arrives, and so that's what
834 * we're hoping to see. (Of course, we still have to
835 * respond to other types of messages.)
837 TIME now = msTime() /*CURRENT_TIME*/;
838 IF_PAR_DEBUG(verbose,
839 belch("-- now=%ld", now));
840 IF_PAR_DEBUG(verbose,
841 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
842 (last_fish_arrived_at!=0 &&
843 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
844 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
845 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
846 last_fish_arrived_at,
847 RtsFlags.ParFlags.fishDelay, now);
850 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
851 (last_fish_arrived_at==0 ||
852 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
853 /* outstandingFishes is set in sendFish, processFish;
854 avoid flooding system with fishes via delay */
856 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
859 // Global statistics: count no. of fishes
860 if (RtsFlags.ParFlags.ParStats.Global &&
861 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
862 globalParStats.tot_fish_mess++;
866 receivedFinish = processMessages();
869 } else if (PacketsWaiting()) { /* Look for incoming messages */
870 receivedFinish = processMessages();
873 /* Now we are sure that we have some work available */
874 ASSERT(run_queue_hd != END_TSO_QUEUE);
876 /* Take a thread from the run queue, if we have work */
877 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
878 IF_DEBUG(sanity,checkTSO(t));
880 /* ToDo: write something to the log-file
881 if (RTSflags.ParFlags.granSimStats && !sameThread)
882 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
886 /* the spark pool for the current PE */
887 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
890 belch("--=^ %d threads, %d sparks on [%#x]",
891 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
894 if (0 && RtsFlags.ParFlags.ParStats.Full &&
895 t && LastTSO && t->id != LastTSO->id &&
896 LastTSO->why_blocked == NotBlocked &&
897 LastTSO->what_next != ThreadComplete) {
898 // if previously scheduled TSO not blocked we have to record the context switch
899 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
900 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
903 if (RtsFlags.ParFlags.ParStats.Full &&
904 (emitSchedule /* forced emit */ ||
905 (t && LastTSO && t->id != LastTSO->id))) {
907 we are running a different TSO, so write a schedule event to log file
908 NB: If we use fair scheduling we also have to write a deschedule
909 event for LastTSO; with unfair scheduling we know that the
910 previous tso has blocked whenever we switch to another tso, so
911 we don't need it in GUM for now
913 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
914 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
915 emitSchedule = rtsFalse;
919 #else /* !GRAN && !PAR */
921 // grab a thread from the run queue
922 ASSERT(run_queue_hd != END_TSO_QUEUE);
925 // Sanity check the thread we're about to run. This can be
926 // expensive if there is lots of thread switching going on...
927 IF_DEBUG(sanity,checkTSO(t));
933 for(m = main_threads; m; m = m->link)
944 sched_belch("### Running thread %d in bound thread", t->id));
945 // yes, the Haskell thread is bound to the current native thread
950 sched_belch("### thread %d bound to another OS thread", t->id));
951 // no, bound to a different Haskell thread: pass to that thread
952 PUSH_ON_RUN_QUEUE(t);
953 passCapability(&m->bound_thread_cond);
959 if(mainThread != NULL)
960 // The thread we want to run is bound.
963 sched_belch("### this OS thread cannot run thread %d", t->id));
964 // no, the current native thread is bound to a different
965 // Haskell thread, so pass it to any worker thread
966 PUSH_ON_RUN_QUEUE(t);
967 passCapabilityToWorker();
974 cap->r.rCurrentTSO = t;
976 /* context switches are now initiated by the timer signal, unless
977 * the user specified "context switch as often as possible", with
980 if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
981 && (run_queue_hd != END_TSO_QUEUE
982 || blocked_queue_hd != END_TSO_QUEUE
983 || sleeping_queue != END_TSO_QUEUE)))
990 RELEASE_LOCK(&sched_mutex);
992 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
993 t->id, whatNext_strs[t->what_next]));
996 startHeapProfTimer();
999 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1000 /* Run the current thread
1002 prev_what_next = t->what_next;
1003 switch (prev_what_next) {
1005 case ThreadComplete:
1006 /* Thread already finished, return to scheduler. */
1007 ret = ThreadFinished;
1010 errno = t->saved_errno;
1011 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1012 t->saved_errno = errno;
1014 case ThreadInterpret:
1015 ret = interpretBCO(cap);
1018 barf("schedule: invalid what_next field");
1020 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1022 /* Costs for the scheduler are assigned to CCS_SYSTEM */
1024 stopHeapProfTimer();
1028 ACQUIRE_LOCK(&sched_mutex);
1030 #ifdef RTS_SUPPORTS_THREADS
1031 IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
1032 #elif !defined(GRAN) && !defined(PAR)
1033 IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
1035 t = cap->r.rCurrentTSO;
1038 /* HACK 675: if the last thread didn't yield, make sure to print a
1039 SCHEDULE event to the log file when StgRunning the next thread, even
1040 if it is the same one as before */
1042 TimeOfLastYield = CURRENT_TIME;
1048 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1049 globalGranStats.tot_heapover++;
1051 globalParStats.tot_heapover++;
1054 // did the task ask for a large block?
1055 if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1056 // if so, get one and push it on the front of the nursery.
1060 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1062 IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)",
1063 t->id, whatNext_strs[t->what_next], blocks));
1065 // don't do this if it would push us over the
1066 // alloc_blocks_lim limit; we'll GC first.
1067 if (alloc_blocks + blocks < alloc_blocks_lim) {
1069 alloc_blocks += blocks;
1070 bd = allocGroup( blocks );
1072 // link the new group into the list
1073 bd->link = cap->r.rCurrentNursery;
1074 bd->u.back = cap->r.rCurrentNursery->u.back;
1075 if (cap->r.rCurrentNursery->u.back != NULL) {
1076 cap->r.rCurrentNursery->u.back->link = bd;
1078 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1079 g0s0->blocks == cap->r.rNursery);
1080 cap->r.rNursery = g0s0->blocks = bd;
1082 cap->r.rCurrentNursery->u.back = bd;
1084 // initialise it as a nursery block. We initialise the
1085 // step, gen_no, and flags field of *every* sub-block in
1086 // this large block, because this is easier than making
1087 // sure that we always find the block head of a large
1088 // block whenever we call Bdescr() (eg. evacuate() and
1089 // isAlive() in the GC would both have to do this, at
1093 for (x = bd; x < bd + blocks; x++) {
1100 // don't forget to update the block count in g0s0.
1101 g0s0->n_blocks += blocks;
1102 // This assert can be a killer if the app is doing lots
1103 // of large block allocations.
1104 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1106 // now update the nursery to point to the new block
1107 cap->r.rCurrentNursery = bd;
1109 // we might be unlucky and have another thread get on the
1110 // run queue before us and steal the large block, but in that
1111 // case the thread will just end up requesting another large
1113 PUSH_ON_RUN_QUEUE(t);
1118 /* make all the running tasks block on a condition variable,
1119 * maybe set context_switch and wait till they all pile in,
1120 * then have them wait on a GC condition variable.
1122 IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow",
1123 t->id, whatNext_strs[t->what_next]));
1126 ASSERT(!is_on_queue(t,CurrentProc));
1128 /* Currently we emit a DESCHEDULE event before GC in GUM.
1129 ToDo: either add separate event to distinguish SYSTEM time from rest
1130 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1131 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1132 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1133 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1134 emitSchedule = rtsTrue;
1138 ready_to_gc = rtsTrue;
1139 context_switch = 1; /* stop other threads ASAP */
1140 PUSH_ON_RUN_QUEUE(t);
1141 /* actual GC is done at the end of the while loop */
1147 DumpGranEvent(GR_DESCHEDULE, t));
1148 globalGranStats.tot_stackover++;
1151 // DumpGranEvent(GR_DESCHEDULE, t);
1152 globalParStats.tot_stackover++;
1154 IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow",
1155 t->id, whatNext_strs[t->what_next]));
1156 /* just adjust the stack for this thread, then pop it back
1162 /* enlarge the stack */
1163 StgTSO *new_t = threadStackOverflow(t);
1165 /* This TSO has moved, so update any pointers to it from the
1166 * main thread stack. It better not be on any other queues...
1167 * (it shouldn't be).
1169 for (m = main_threads; m != NULL; m = m->link) {
1174 threadPaused(new_t);
1175 PUSH_ON_RUN_QUEUE(new_t);
1179 case ThreadYielding:
1182 DumpGranEvent(GR_DESCHEDULE, t));
1183 globalGranStats.tot_yields++;
1186 // DumpGranEvent(GR_DESCHEDULE, t);
1187 globalParStats.tot_yields++;
1189 /* put the thread back on the run queue. Then, if we're ready to
1190 * GC, check whether this is the last task to stop. If so, wake
1191 * up the GC thread. getThread will block during a GC until the
1195 if (t->what_next != prev_what_next) {
1196 belch("--<< thread %ld (%s) stopped to switch evaluators",
1197 t->id, whatNext_strs[t->what_next]);
1199 belch("--<< thread %ld (%s) stopped, yielding",
1200 t->id, whatNext_strs[t->what_next]);
1205 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1207 ASSERT(t->link == END_TSO_QUEUE);
1209 // Shortcut if we're just switching evaluators: don't bother
1210 // doing stack squeezing (which can be expensive), just run the
1212 if (t->what_next != prev_what_next) {
1219 ASSERT(!is_on_queue(t,CurrentProc));
1222 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1223 checkThreadQsSanity(rtsTrue));
1227 if (RtsFlags.ParFlags.doFairScheduling) {
1228 /* this does round-robin scheduling; good for concurrency */
1229 APPEND_TO_RUN_QUEUE(t);
1231 /* this does unfair scheduling; good for parallelism */
1232 PUSH_ON_RUN_QUEUE(t);
1235 // this does round-robin scheduling; good for concurrency
1236 APPEND_TO_RUN_QUEUE(t);
1240 /* add a ContinueThread event to actually process the thread */
1241 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1243 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1245 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1254 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1255 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1256 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1258 // ??? needed; should emit block before
1260 DumpGranEvent(GR_DESCHEDULE, t));
1261 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1264 ASSERT(procStatus[CurrentProc]==Busy ||
1265 ((procStatus[CurrentProc]==Fetching) &&
1266 (t->block_info.closure!=(StgClosure*)NULL)));
1267 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1268 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1269 procStatus[CurrentProc]==Fetching))
1270 procStatus[CurrentProc] = Idle;
1274 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1275 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1278 if (t->block_info.closure!=(StgClosure*)NULL)
1279 print_bq(t->block_info.closure));
1281 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1284 /* whatever we schedule next, we must log that schedule */
1285 emitSchedule = rtsTrue;
1288 /* don't need to do anything. Either the thread is blocked on
1289 * I/O, in which case we'll have called addToBlockedQueue
1290 * previously, or it's blocked on an MVar or Blackhole, in which
1291 * case it'll be on the relevant queue already.
1294 fprintf(stderr, "--<< thread %d (%s) stopped: ",
1295 t->id, whatNext_strs[t->what_next]);
1296 printThreadBlockage(t);
1297 fprintf(stderr, "\n"));
1300 /* Only for dumping event to log file
1301 ToDo: do I need this in GranSim, too?
1308 case ThreadFinished:
1309 /* Need to check whether this was a main thread, and if so, signal
1310 * the task that started it with the return value. If we have no
1311 * more main threads, we probably need to stop all the tasks until
1314 /* We also end up here if the thread kills itself with an
1315 * uncaught exception, see Exception.hc.
1317 IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished",
1318 t->id, whatNext_strs[t->what_next]));
1320 endThread(t, CurrentProc); // clean-up the thread
1322 /* For now all are advisory -- HWL */
1323 //if(t->priority==AdvisoryPriority) ??
1324 advisory_thread_count--;
1327 if(t->dist.priority==RevalPriority)
1331 if (RtsFlags.ParFlags.ParStats.Full &&
1332 !RtsFlags.ParFlags.ParStats.Suppressed)
1333 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1338 barf("schedule: invalid thread return code %d", (int)ret);
1342 // When we have +RTS -i0 and we're heap profiling, do a census at
1343 // every GC. This lets us get repeatable runs for debugging.
1344 if (performHeapProfile ||
1345 (RtsFlags.ProfFlags.profileInterval==0 &&
1346 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1347 GarbageCollect(GetRoots, rtsTrue);
1349 performHeapProfile = rtsFalse;
1350 ready_to_gc = rtsFalse; // we already GC'd
1355 /* everybody back, start the GC.
1356 * Could do it in this thread, or signal a condition var
1357 * to do it in another thread. Either way, we need to
1358 * broadcast on gc_pending_cond afterward.
1360 #if defined(RTS_SUPPORTS_THREADS)
1361 IF_DEBUG(scheduler,sched_belch("doing GC"));
1363 GarbageCollect(GetRoots,rtsFalse);
1364 ready_to_gc = rtsFalse;
1366 /* add a ContinueThread event to continue execution of current thread */
1367 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1369 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1371 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1379 IF_GRAN_DEBUG(unused,
1380 print_eventq(EventHd));
1382 event = get_next_event();
1385 /* ToDo: wait for next message to arrive rather than busy wait */
1388 } /* end of while(1) */
1390 IF_PAR_DEBUG(verbose,
1391 belch("== Leaving schedule() after having received Finish"));
1394 /* ---------------------------------------------------------------------------
1395 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1396 * used by Control.Concurrent for error checking.
1397 * ------------------------------------------------------------------------- */
1400 rtsSupportsBoundThreads(void)
1409 /* ---------------------------------------------------------------------------
1410 * isThreadBound(tso): check whether tso is bound to an OS thread.
1411 * ------------------------------------------------------------------------- */
1414 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1418 for(m = main_threads; m; m = m->link)
1427 /* ---------------------------------------------------------------------------
1428 * Singleton fork(). Do not copy any running threads.
1429 * ------------------------------------------------------------------------- */
1432 deleteThreadImmediately(StgTSO *tso);
1435 forkProcess(HsStablePtr *entry)
1437 #ifndef mingw32_TARGET_OS
1443 IF_DEBUG(scheduler,sched_belch("forking!"));
1444 rts_lock(); // This not only acquires sched_mutex, it also
1445 // makes sure that no other threads are running
1449 if (pid) { /* parent */
1451 /* just return the pid */
1455 } else { /* child */
1458 // delete all threads
1459 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1461 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1464 // don't allow threads to catch the ThreadKilled exception
1465 deleteThreadImmediately(t);
1468 // wipe the main thread list
1469 while((m = main_threads) != NULL) {
1470 main_threads = m->link;
1472 closeCondition(&m->bound_thread_cond);
1477 #ifdef RTS_SUPPORTS_THREADS
1478 resetTaskManagerAfterFork(); // tell startTask() and friends that
1479 startingWorkerThread = rtsFalse; // we have no worker threads any more
1480 resetWorkerWakeupPipeAfterFork();
1483 rc = rts_evalStableIO(entry, NULL); // run the action
1484 rts_checkSchedStatus("forkProcess",rc);
1488 hs_exit(); // clean up and exit
1492 barf("forkProcess#: primop not implemented for mingw32, sorry!\n");
1494 #endif /* mingw32 */
1497 /* ---------------------------------------------------------------------------
1498 * deleteAllThreads(): kill all the live threads.
1500 * This is used when we catch a user interrupt (^C), before performing
1501 * any necessary cleanups and running finalizers.
1503 * Locks: sched_mutex held.
1504 * ------------------------------------------------------------------------- */
1507 deleteAllThreads ( void )
1510 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1511 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1512 next = t->global_link;
1515 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1516 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1517 sleeping_queue = END_TSO_QUEUE;
1520 /* startThread and insertThread are now in GranSim.c -- HWL */
1523 /* ---------------------------------------------------------------------------
1524 * Suspending & resuming Haskell threads.
1526 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1527 * its capability before calling the C function. This allows another
1528 * task to pick up the capability and carry on running Haskell
1529 * threads. It also means that if the C call blocks, it won't lock
1532 * The Haskell thread making the C call is put to sleep for the
1533 * duration of the call, on the susepended_ccalling_threads queue. We
1534 * give out a token to the task, which it can use to resume the thread
1535 * on return from the C function.
1536 * ------------------------------------------------------------------------- */
1539 suspendThread( StgRegTable *reg,
1548 int saved_errno = errno;
1550 /* assume that *reg is a pointer to the StgRegTable part
1553 cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1555 ACQUIRE_LOCK(&sched_mutex);
1558 sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1560 // XXX this might not be necessary --SDM
1561 cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1563 threadPaused(cap->r.rCurrentTSO);
1564 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1565 suspended_ccalling_threads = cap->r.rCurrentTSO;
1567 #if defined(RTS_SUPPORTS_THREADS)
1568 if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1570 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1571 cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1575 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1579 /* Use the thread ID as the token; it should be unique */
1580 tok = cap->r.rCurrentTSO->id;
1582 /* Hand back capability */
1583 releaseCapability(cap);
1585 #if defined(RTS_SUPPORTS_THREADS)
1586 /* Preparing to leave the RTS, so ensure there's a native thread/task
1587 waiting to take over.
1589 IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1592 /* Other threads _might_ be available for execution; signal this */
1594 RELEASE_LOCK(&sched_mutex);
1596 errno = saved_errno;
1601 resumeThread( StgInt tok,
1602 rtsBool concCall STG_UNUSED )
1604 StgTSO *tso, **prev;
1606 int saved_errno = errno;
1608 #if defined(RTS_SUPPORTS_THREADS)
1609 /* Wait for permission to re-enter the RTS with the result. */
1610 ACQUIRE_LOCK(&sched_mutex);
1611 waitForReturnCapability(&sched_mutex, &cap);
1613 IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1615 grabCapability(&cap);
1618 /* Remove the thread off of the suspended list */
1619 prev = &suspended_ccalling_threads;
1620 for (tso = suspended_ccalling_threads;
1621 tso != END_TSO_QUEUE;
1622 prev = &tso->link, tso = tso->link) {
1623 if (tso->id == (StgThreadID)tok) {
1628 if (tso == END_TSO_QUEUE) {
1629 barf("resumeThread: thread not found");
1631 tso->link = END_TSO_QUEUE;
1633 #if defined(RTS_SUPPORTS_THREADS)
1634 if(tso->why_blocked == BlockedOnCCall)
1636 awakenBlockedQueueNoLock(tso->blocked_exceptions);
1637 tso->blocked_exceptions = NULL;
1641 /* Reset blocking status */
1642 tso->why_blocked = NotBlocked;
1644 cap->r.rCurrentTSO = tso;
1645 RELEASE_LOCK(&sched_mutex);
1646 errno = saved_errno;
1651 /* ---------------------------------------------------------------------------
1653 * ------------------------------------------------------------------------ */
1654 static void unblockThread(StgTSO *tso);
1656 /* ---------------------------------------------------------------------------
1657 * Comparing Thread ids.
1659 * This is used from STG land in the implementation of the
1660 * instances of Eq/Ord for ThreadIds.
1661 * ------------------------------------------------------------------------ */
1664 cmp_thread(StgPtr tso1, StgPtr tso2)
1666 StgThreadID id1 = ((StgTSO *)tso1)->id;
1667 StgThreadID id2 = ((StgTSO *)tso2)->id;
1669 if (id1 < id2) return (-1);
1670 if (id1 > id2) return 1;
1674 /* ---------------------------------------------------------------------------
1675 * Fetching the ThreadID from an StgTSO.
1677 * This is used in the implementation of Show for ThreadIds.
1678 * ------------------------------------------------------------------------ */
1680 rts_getThreadId(StgPtr tso)
1682 return ((StgTSO *)tso)->id;
1687 labelThread(StgPtr tso, char *label)
1692 /* Caveat: Once set, you can only set the thread name to "" */
1693 len = strlen(label)+1;
1694 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1695 strncpy(buf,label,len);
1696 /* Update will free the old memory for us */
1697 updateThreadLabel(((StgTSO *)tso)->id,buf);
1701 /* ---------------------------------------------------------------------------
1702 Create a new thread.
1704 The new thread starts with the given stack size. Before the
1705 scheduler can run, however, this thread needs to have a closure
1706 (and possibly some arguments) pushed on its stack. See
1707 pushClosure() in Schedule.h.
1709 createGenThread() and createIOThread() (in SchedAPI.h) are
1710 convenient packaged versions of this function.
1712 currently pri (priority) is only used in a GRAN setup -- HWL
1713 ------------------------------------------------------------------------ */
1715 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1717 createThread(nat size, StgInt pri)
1720 createThread(nat size)
1727 /* First check whether we should create a thread at all */
1729 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1730 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1732 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1733 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1734 return END_TSO_QUEUE;
1740 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1743 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1745 /* catch ridiculously small stack sizes */
1746 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1747 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1750 stack_size = size - TSO_STRUCT_SIZEW;
1752 tso = (StgTSO *)allocate(size);
1753 TICK_ALLOC_TSO(stack_size, 0);
1755 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1757 SET_GRAN_HDR(tso, ThisPE);
1760 // Always start with the compiled code evaluator
1761 tso->what_next = ThreadRunGHC;
1763 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1764 * protect the increment operation on next_thread_id.
1765 * In future, we could use an atomic increment instead.
1767 ACQUIRE_LOCK(&thread_id_mutex);
1768 tso->id = next_thread_id++;
1769 RELEASE_LOCK(&thread_id_mutex);
1771 tso->why_blocked = NotBlocked;
1772 tso->blocked_exceptions = NULL;
1774 tso->saved_errno = 0;
1776 tso->stack_size = stack_size;
1777 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1779 tso->sp = (P_)&(tso->stack) + stack_size;
1782 tso->prof.CCCS = CCS_MAIN;
1785 /* put a stop frame on the stack */
1786 tso->sp -= sizeofW(StgStopFrame);
1787 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1790 tso->link = END_TSO_QUEUE;
1791 /* uses more flexible routine in GranSim */
1792 insertThread(tso, CurrentProc);
1794 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1800 if (RtsFlags.GranFlags.GranSimStats.Full)
1801 DumpGranEvent(GR_START,tso);
1803 if (RtsFlags.ParFlags.ParStats.Full)
1804 DumpGranEvent(GR_STARTQ,tso);
1805 /* HACk to avoid SCHEDULE
1809 /* Link the new thread on the global thread list.
1811 tso->global_link = all_threads;
1815 tso->dist.priority = MandatoryPriority; //by default that is...
1819 tso->gran.pri = pri;
1821 tso->gran.magic = TSO_MAGIC; // debugging only
1823 tso->gran.sparkname = 0;
1824 tso->gran.startedat = CURRENT_TIME;
1825 tso->gran.exported = 0;
1826 tso->gran.basicblocks = 0;
1827 tso->gran.allocs = 0;
1828 tso->gran.exectime = 0;
1829 tso->gran.fetchtime = 0;
1830 tso->gran.fetchcount = 0;
1831 tso->gran.blocktime = 0;
1832 tso->gran.blockcount = 0;
1833 tso->gran.blockedat = 0;
1834 tso->gran.globalsparks = 0;
1835 tso->gran.localsparks = 0;
1836 if (RtsFlags.GranFlags.Light)
1837 tso->gran.clock = Now; /* local clock */
1839 tso->gran.clock = 0;
1841 IF_DEBUG(gran,printTSO(tso));
1844 tso->par.magic = TSO_MAGIC; // debugging only
1846 tso->par.sparkname = 0;
1847 tso->par.startedat = CURRENT_TIME;
1848 tso->par.exported = 0;
1849 tso->par.basicblocks = 0;
1850 tso->par.allocs = 0;
1851 tso->par.exectime = 0;
1852 tso->par.fetchtime = 0;
1853 tso->par.fetchcount = 0;
1854 tso->par.blocktime = 0;
1855 tso->par.blockcount = 0;
1856 tso->par.blockedat = 0;
1857 tso->par.globalsparks = 0;
1858 tso->par.localsparks = 0;
1862 globalGranStats.tot_threads_created++;
1863 globalGranStats.threads_created_on_PE[CurrentProc]++;
1864 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1865 globalGranStats.tot_sq_probes++;
1867 // collect parallel global statistics (currently done together with GC stats)
1868 if (RtsFlags.ParFlags.ParStats.Global &&
1869 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1870 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
1871 globalParStats.tot_threads_created++;
1877 belch("==__ schedule: Created TSO %d (%p);",
1878 CurrentProc, tso, tso->id));
1880 IF_PAR_DEBUG(verbose,
1881 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1882 tso->id, tso, advisory_thread_count));
1884 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1885 tso->id, tso->stack_size));
1892 all parallel thread creation calls should fall through the following routine.
1895 createSparkThread(rtsSpark spark)
1897 ASSERT(spark != (rtsSpark)NULL);
1898 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1900 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1901 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1902 return END_TSO_QUEUE;
1906 tso = createThread(RtsFlags.GcFlags.initialStkSize);
1907 if (tso==END_TSO_QUEUE)
1908 barf("createSparkThread: Cannot create TSO");
1910 tso->priority = AdvisoryPriority;
1912 pushClosure(tso,spark);
1913 PUSH_ON_RUN_QUEUE(tso);
1914 advisory_thread_count++;
1921 Turn a spark into a thread.
1922 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1926 activateSpark (rtsSpark spark)
1930 tso = createSparkThread(spark);
1931 if (RtsFlags.ParFlags.ParStats.Full) {
1932 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1933 IF_PAR_DEBUG(verbose,
1934 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1935 (StgClosure *)spark, info_type((StgClosure *)spark)));
1937 // ToDo: fwd info on local/global spark to thread -- HWL
1938 // tso->gran.exported = spark->exported;
1939 // tso->gran.locked = !spark->global;
1940 // tso->gran.sparkname = spark->name;
1946 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1947 Capability *initialCapability
1951 /* ---------------------------------------------------------------------------
1954 * scheduleThread puts a thread on the head of the runnable queue.
1955 * This will usually be done immediately after a thread is created.
1956 * The caller of scheduleThread must create the thread using e.g.
1957 * createThread and push an appropriate closure
1958 * on this thread's stack before the scheduler is invoked.
1959 * ------------------------------------------------------------------------ */
1961 static void scheduleThread_ (StgTSO* tso);
1964 scheduleThread_(StgTSO *tso)
1966 // Precondition: sched_mutex must be held.
1968 /* Put the new thread on the head of the runnable queue. The caller
1969 * better push an appropriate closure on this thread's stack
1970 * beforehand. In the SMP case, the thread may start running as
1971 * soon as we release the scheduler lock below.
1973 PUSH_ON_RUN_QUEUE(tso);
1977 IF_DEBUG(scheduler,printTSO(tso));
1981 void scheduleThread(StgTSO* tso)
1983 ACQUIRE_LOCK(&sched_mutex);
1984 scheduleThread_(tso);
1985 RELEASE_LOCK(&sched_mutex);
1989 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1990 Capability *initialCapability)
1992 // Precondition: sched_mutex must be held
1995 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1999 #if defined(RTS_SUPPORTS_THREADS)
2000 initCondition(&m->bound_thread_cond);
2003 /* Put the thread on the main-threads list prior to scheduling the TSO.
2004 Failure to do so introduces a race condition in the MT case (as
2005 identified by Wolfgang Thaller), whereby the new task/OS thread
2006 created by scheduleThread_() would complete prior to the thread
2007 that spawned it managed to put 'itself' on the main-threads list.
2008 The upshot of it all being that the worker thread wouldn't get to
2009 signal the completion of the its work item for the main thread to
2010 see (==> it got stuck waiting.) -- sof 6/02.
2012 IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2014 m->link = main_threads;
2017 scheduleThread_(tso);
2019 return waitThread_(m, initialCapability);
2022 /* ---------------------------------------------------------------------------
2025 * Initialise the scheduler. This resets all the queues - if the
2026 * queues contained any threads, they'll be garbage collected at the
2029 * ------------------------------------------------------------------------ */
2037 for (i=0; i<=MAX_PROC; i++) {
2038 run_queue_hds[i] = END_TSO_QUEUE;
2039 run_queue_tls[i] = END_TSO_QUEUE;
2040 blocked_queue_hds[i] = END_TSO_QUEUE;
2041 blocked_queue_tls[i] = END_TSO_QUEUE;
2042 ccalling_threadss[i] = END_TSO_QUEUE;
2043 sleeping_queue = END_TSO_QUEUE;
2046 run_queue_hd = END_TSO_QUEUE;
2047 run_queue_tl = END_TSO_QUEUE;
2048 blocked_queue_hd = END_TSO_QUEUE;
2049 blocked_queue_tl = END_TSO_QUEUE;
2050 sleeping_queue = END_TSO_QUEUE;
2053 suspended_ccalling_threads = END_TSO_QUEUE;
2055 main_threads = NULL;
2056 all_threads = END_TSO_QUEUE;
2061 RtsFlags.ConcFlags.ctxtSwitchTicks =
2062 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2064 #if defined(RTS_SUPPORTS_THREADS)
2065 /* Initialise the mutex and condition variables used by
2067 initMutex(&sched_mutex);
2068 initMutex(&term_mutex);
2069 initMutex(&thread_id_mutex);
2071 initCondition(&thread_ready_cond);
2074 #if defined(RTS_SUPPORTS_THREADS)
2075 ACQUIRE_LOCK(&sched_mutex);
2078 /* A capability holds the state a native thread needs in
2079 * order to execute STG code. At least one capability is
2080 * floating around (only SMP builds have more than one).
2084 #if defined(RTS_SUPPORTS_THREADS)
2085 /* start our haskell execution tasks */
2086 startTaskManager(0,taskStart);
2089 #if /* defined(SMP) ||*/ defined(PAR)
2093 RELEASE_LOCK(&sched_mutex);
2098 exitScheduler( void )
2100 #if defined(RTS_SUPPORTS_THREADS)
2103 shutting_down_scheduler = rtsTrue;
2106 /* ----------------------------------------------------------------------------
2107 Managing the per-task allocation areas.
2109 Each capability comes with an allocation area. These are
2110 fixed-length block lists into which allocation can be done.
2112 ToDo: no support for two-space collection at the moment???
2113 ------------------------------------------------------------------------- */
2117 waitThread_(StgMainThread* m, Capability *initialCapability)
2119 SchedulerStatus stat;
2121 // Precondition: sched_mutex must be held.
2122 IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2125 /* GranSim specific init */
2126 CurrentTSO = m->tso; // the TSO to run
2127 procStatus[MainProc] = Busy; // status of main PE
2128 CurrentProc = MainProc; // PE to run it on
2129 schedule(m,initialCapability);
2131 schedule(m,initialCapability);
2132 ASSERT(m->stat != NoStatus);
2137 #if defined(RTS_SUPPORTS_THREADS)
2138 closeCondition(&m->bound_thread_cond);
2141 IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2144 // Postcondition: sched_mutex still held
2148 /* ---------------------------------------------------------------------------
2149 Where are the roots that we know about?
2151 - all the threads on the runnable queue
2152 - all the threads on the blocked queue
2153 - all the threads on the sleeping queue
2154 - all the thread currently executing a _ccall_GC
2155 - all the "main threads"
2157 ------------------------------------------------------------------------ */
2159 /* This has to be protected either by the scheduler monitor, or by the
2160 garbage collection monitor (probably the latter).
2165 GetRoots( evac_fn evac )
2170 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2171 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2172 evac((StgClosure **)&run_queue_hds[i]);
2173 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2174 evac((StgClosure **)&run_queue_tls[i]);
2176 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2177 evac((StgClosure **)&blocked_queue_hds[i]);
2178 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2179 evac((StgClosure **)&blocked_queue_tls[i]);
2180 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2181 evac((StgClosure **)&ccalling_threads[i]);
2188 if (run_queue_hd != END_TSO_QUEUE) {
2189 ASSERT(run_queue_tl != END_TSO_QUEUE);
2190 evac((StgClosure **)&run_queue_hd);
2191 evac((StgClosure **)&run_queue_tl);
2194 if (blocked_queue_hd != END_TSO_QUEUE) {
2195 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2196 evac((StgClosure **)&blocked_queue_hd);
2197 evac((StgClosure **)&blocked_queue_tl);
2200 if (sleeping_queue != END_TSO_QUEUE) {
2201 evac((StgClosure **)&sleeping_queue);
2205 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2206 evac((StgClosure **)&suspended_ccalling_threads);
2209 #if defined(PAR) || defined(GRAN)
2210 markSparkQueue(evac);
2213 #if defined(RTS_USER_SIGNALS)
2214 // mark the signal handlers (signals should be already blocked)
2215 markSignalHandlers(evac);
2218 // main threads which have completed need to be retained until they
2219 // are dealt with in the main scheduler loop. They won't be
2220 // retained any other way: the GC will drop them from the
2221 // all_threads list, so we have to be careful to treat them as roots
2225 for (m = main_threads; m != NULL; m = m->link) {
2226 switch (m->tso->what_next) {
2227 case ThreadComplete:
2229 evac((StgClosure **)&m->tso);
2238 /* -----------------------------------------------------------------------------
2241 This is the interface to the garbage collector from Haskell land.
2242 We provide this so that external C code can allocate and garbage
2243 collect when called from Haskell via _ccall_GC.
2245 It might be useful to provide an interface whereby the programmer
2246 can specify more roots (ToDo).
2248 This needs to be protected by the GC condition variable above. KH.
2249 -------------------------------------------------------------------------- */
2251 static void (*extra_roots)(evac_fn);
2256 /* Obligated to hold this lock upon entry */
2257 ACQUIRE_LOCK(&sched_mutex);
2258 GarbageCollect(GetRoots,rtsFalse);
2259 RELEASE_LOCK(&sched_mutex);
2263 performMajorGC(void)
2265 ACQUIRE_LOCK(&sched_mutex);
2266 GarbageCollect(GetRoots,rtsTrue);
2267 RELEASE_LOCK(&sched_mutex);
2271 AllRoots(evac_fn evac)
2273 GetRoots(evac); // the scheduler's roots
2274 extra_roots(evac); // the user's roots
2278 performGCWithRoots(void (*get_roots)(evac_fn))
2280 ACQUIRE_LOCK(&sched_mutex);
2281 extra_roots = get_roots;
2282 GarbageCollect(AllRoots,rtsFalse);
2283 RELEASE_LOCK(&sched_mutex);
2286 /* -----------------------------------------------------------------------------
2289 If the thread has reached its maximum stack size, then raise the
2290 StackOverflow exception in the offending thread. Otherwise
2291 relocate the TSO into a larger chunk of memory and adjust its stack
2293 -------------------------------------------------------------------------- */
2296 threadStackOverflow(StgTSO *tso)
2298 nat new_stack_size, new_tso_size, stack_words;
2302 IF_DEBUG(sanity,checkTSO(tso));
2303 if (tso->stack_size >= tso->max_stack_size) {
2306 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2307 tso->id, tso, tso->stack_size, tso->max_stack_size);
2308 /* If we're debugging, just print out the top of the stack */
2309 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2312 /* Send this thread the StackOverflow exception */
2313 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2317 /* Try to double the current stack size. If that takes us over the
2318 * maximum stack size for this thread, then use the maximum instead.
2319 * Finally round up so the TSO ends up as a whole number of blocks.
2321 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2322 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2323 TSO_STRUCT_SIZE)/sizeof(W_);
2324 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2325 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2327 IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2329 dest = (StgTSO *)allocate(new_tso_size);
2330 TICK_ALLOC_TSO(new_stack_size,0);
2332 /* copy the TSO block and the old stack into the new area */
2333 memcpy(dest,tso,TSO_STRUCT_SIZE);
2334 stack_words = tso->stack + tso->stack_size - tso->sp;
2335 new_sp = (P_)dest + new_tso_size - stack_words;
2336 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2338 /* relocate the stack pointers... */
2340 dest->stack_size = new_stack_size;
2342 /* Mark the old TSO as relocated. We have to check for relocated
2343 * TSOs in the garbage collector and any primops that deal with TSOs.
2345 * It's important to set the sp value to just beyond the end
2346 * of the stack, so we don't attempt to scavenge any part of the
2349 tso->what_next = ThreadRelocated;
2351 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2352 tso->why_blocked = NotBlocked;
2353 dest->mut_link = NULL;
2355 IF_PAR_DEBUG(verbose,
2356 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2357 tso->id, tso, tso->stack_size);
2358 /* If we're debugging, just print out the top of the stack */
2359 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2362 IF_DEBUG(sanity,checkTSO(tso));
2364 IF_DEBUG(scheduler,printTSO(dest));
2370 /* ---------------------------------------------------------------------------
2371 Wake up a queue that was blocked on some resource.
2372 ------------------------------------------------------------------------ */
2376 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2381 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2383 /* write RESUME events to log file and
2384 update blocked and fetch time (depending on type of the orig closure) */
2385 if (RtsFlags.ParFlags.ParStats.Full) {
2386 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2387 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2388 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2389 if (EMPTY_RUN_QUEUE())
2390 emitSchedule = rtsTrue;
2392 switch (get_itbl(node)->type) {
2394 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2399 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2406 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2413 static StgBlockingQueueElement *
2414 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2417 PEs node_loc, tso_loc;
2419 node_loc = where_is(node); // should be lifted out of loop
2420 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2421 tso_loc = where_is((StgClosure *)tso);
2422 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2423 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2424 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2425 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2426 // insertThread(tso, node_loc);
2427 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2429 tso, node, (rtsSpark*)NULL);
2430 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2433 } else { // TSO is remote (actually should be FMBQ)
2434 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2435 RtsFlags.GranFlags.Costs.gunblocktime +
2436 RtsFlags.GranFlags.Costs.latency;
2437 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2439 tso, node, (rtsSpark*)NULL);
2440 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2443 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2445 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2446 (node_loc==tso_loc ? "Local" : "Global"),
2447 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2448 tso->block_info.closure = NULL;
2449 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2453 static StgBlockingQueueElement *
2454 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2456 StgBlockingQueueElement *next;
2458 switch (get_itbl(bqe)->type) {
2460 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2461 /* if it's a TSO just push it onto the run_queue */
2463 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2464 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2466 unblockCount(bqe, node);
2467 /* reset blocking status after dumping event */
2468 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2472 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2474 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2475 PendingFetches = (StgBlockedFetch *)bqe;
2479 /* can ignore this case in a non-debugging setup;
2480 see comments on RBHSave closures above */
2482 /* check that the closure is an RBHSave closure */
2483 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2484 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2485 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2489 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2490 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2494 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2498 #else /* !GRAN && !PAR */
2500 unblockOneLocked(StgTSO *tso)
2504 ASSERT(get_itbl(tso)->type == TSO);
2505 ASSERT(tso->why_blocked != NotBlocked);
2506 tso->why_blocked = NotBlocked;
2508 PUSH_ON_RUN_QUEUE(tso);
2510 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2515 #if defined(GRAN) || defined(PAR)
2516 INLINE_ME StgBlockingQueueElement *
2517 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2519 ACQUIRE_LOCK(&sched_mutex);
2520 bqe = unblockOneLocked(bqe, node);
2521 RELEASE_LOCK(&sched_mutex);
2526 unblockOne(StgTSO *tso)
2528 ACQUIRE_LOCK(&sched_mutex);
2529 tso = unblockOneLocked(tso);
2530 RELEASE_LOCK(&sched_mutex);
2537 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2539 StgBlockingQueueElement *bqe;
2544 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2545 node, CurrentProc, CurrentTime[CurrentProc],
2546 CurrentTSO->id, CurrentTSO));
2548 node_loc = where_is(node);
2550 ASSERT(q == END_BQ_QUEUE ||
2551 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2552 get_itbl(q)->type == CONSTR); // closure (type constructor)
2553 ASSERT(is_unique(node));
2555 /* FAKE FETCH: magically copy the node to the tso's proc;
2556 no Fetch necessary because in reality the node should not have been
2557 moved to the other PE in the first place
2559 if (CurrentProc!=node_loc) {
2561 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2562 node, node_loc, CurrentProc, CurrentTSO->id,
2563 // CurrentTSO, where_is(CurrentTSO),
2564 node->header.gran.procs));
2565 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2567 belch("## new bitmask of node %p is %#x",
2568 node, node->header.gran.procs));
2569 if (RtsFlags.GranFlags.GranSimStats.Global) {
2570 globalGranStats.tot_fake_fetches++;
2575 // ToDo: check: ASSERT(CurrentProc==node_loc);
2576 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2579 bqe points to the current element in the queue
2580 next points to the next element in the queue
2582 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2583 //tso_loc = where_is(tso);
2585 bqe = unblockOneLocked(bqe, node);
2588 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2589 the closure to make room for the anchor of the BQ */
2590 if (bqe!=END_BQ_QUEUE) {
2591 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2593 ASSERT((info_ptr==&RBH_Save_0_info) ||
2594 (info_ptr==&RBH_Save_1_info) ||
2595 (info_ptr==&RBH_Save_2_info));
2597 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2598 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2599 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2602 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2603 node, info_type(node)));
2606 /* statistics gathering */
2607 if (RtsFlags.GranFlags.GranSimStats.Global) {
2608 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2609 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2610 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2611 globalGranStats.tot_awbq++; // total no. of bqs awakened
2614 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2615 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2619 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2621 StgBlockingQueueElement *bqe;
2623 ACQUIRE_LOCK(&sched_mutex);
2625 IF_PAR_DEBUG(verbose,
2626 belch("##-_ AwBQ for node %p on [%x]: ",
2630 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2631 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2636 ASSERT(q == END_BQ_QUEUE ||
2637 get_itbl(q)->type == TSO ||
2638 get_itbl(q)->type == BLOCKED_FETCH ||
2639 get_itbl(q)->type == CONSTR);
2642 while (get_itbl(bqe)->type==TSO ||
2643 get_itbl(bqe)->type==BLOCKED_FETCH) {
2644 bqe = unblockOneLocked(bqe, node);
2646 RELEASE_LOCK(&sched_mutex);
2649 #else /* !GRAN && !PAR */
2651 #ifdef RTS_SUPPORTS_THREADS
2653 awakenBlockedQueueNoLock(StgTSO *tso)
2655 while (tso != END_TSO_QUEUE) {
2656 tso = unblockOneLocked(tso);
2662 awakenBlockedQueue(StgTSO *tso)
2664 ACQUIRE_LOCK(&sched_mutex);
2665 while (tso != END_TSO_QUEUE) {
2666 tso = unblockOneLocked(tso);
2668 RELEASE_LOCK(&sched_mutex);
2672 /* ---------------------------------------------------------------------------
2674 - usually called inside a signal handler so it mustn't do anything fancy.
2675 ------------------------------------------------------------------------ */
2678 interruptStgRts(void)
2682 #ifdef RTS_SUPPORTS_THREADS
2683 wakeBlockedWorkerThread();
2687 /* -----------------------------------------------------------------------------
2690 This is for use when we raise an exception in another thread, which
2692 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2693 -------------------------------------------------------------------------- */
2695 #if defined(GRAN) || defined(PAR)
2697 NB: only the type of the blocking queue is different in GranSim and GUM
2698 the operations on the queue-elements are the same
2699 long live polymorphism!
2701 Locks: sched_mutex is held upon entry and exit.
2705 unblockThread(StgTSO *tso)
2707 StgBlockingQueueElement *t, **last;
2709 switch (tso->why_blocked) {
2712 return; /* not blocked */
2715 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2717 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2718 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2720 last = (StgBlockingQueueElement **)&mvar->head;
2721 for (t = (StgBlockingQueueElement *)mvar->head;
2723 last = &t->link, last_tso = t, t = t->link) {
2724 if (t == (StgBlockingQueueElement *)tso) {
2725 *last = (StgBlockingQueueElement *)tso->link;
2726 if (mvar->tail == tso) {
2727 mvar->tail = (StgTSO *)last_tso;
2732 barf("unblockThread (MVAR): TSO not found");
2735 case BlockedOnBlackHole:
2736 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2738 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2740 last = &bq->blocking_queue;
2741 for (t = bq->blocking_queue;
2743 last = &t->link, t = t->link) {
2744 if (t == (StgBlockingQueueElement *)tso) {
2745 *last = (StgBlockingQueueElement *)tso->link;
2749 barf("unblockThread (BLACKHOLE): TSO not found");
2752 case BlockedOnException:
2754 StgTSO *target = tso->block_info.tso;
2756 ASSERT(get_itbl(target)->type == TSO);
2758 if (target->what_next == ThreadRelocated) {
2759 target = target->link;
2760 ASSERT(get_itbl(target)->type == TSO);
2763 ASSERT(target->blocked_exceptions != NULL);
2765 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2766 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2768 last = &t->link, t = t->link) {
2769 ASSERT(get_itbl(t)->type == TSO);
2770 if (t == (StgBlockingQueueElement *)tso) {
2771 *last = (StgBlockingQueueElement *)tso->link;
2775 barf("unblockThread (Exception): TSO not found");
2779 case BlockedOnWrite:
2780 #if defined(mingw32_TARGET_OS)
2781 case BlockedOnDoProc:
2784 /* take TSO off blocked_queue */
2785 StgBlockingQueueElement *prev = NULL;
2786 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2787 prev = t, t = t->link) {
2788 if (t == (StgBlockingQueueElement *)tso) {
2790 blocked_queue_hd = (StgTSO *)t->link;
2791 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2792 blocked_queue_tl = END_TSO_QUEUE;
2795 prev->link = t->link;
2796 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2797 blocked_queue_tl = (StgTSO *)prev;
2803 barf("unblockThread (I/O): TSO not found");
2806 case BlockedOnDelay:
2808 /* take TSO off sleeping_queue */
2809 StgBlockingQueueElement *prev = NULL;
2810 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2811 prev = t, t = t->link) {
2812 if (t == (StgBlockingQueueElement *)tso) {
2814 sleeping_queue = (StgTSO *)t->link;
2816 prev->link = t->link;
2821 barf("unblockThread (delay): TSO not found");
2825 barf("unblockThread");
2829 tso->link = END_TSO_QUEUE;
2830 tso->why_blocked = NotBlocked;
2831 tso->block_info.closure = NULL;
2832 PUSH_ON_RUN_QUEUE(tso);
2836 unblockThread(StgTSO *tso)
2840 /* To avoid locking unnecessarily. */
2841 if (tso->why_blocked == NotBlocked) {
2845 switch (tso->why_blocked) {
2848 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2850 StgTSO *last_tso = END_TSO_QUEUE;
2851 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2854 for (t = mvar->head; t != END_TSO_QUEUE;
2855 last = &t->link, last_tso = t, t = t->link) {
2858 if (mvar->tail == tso) {
2859 mvar->tail = last_tso;
2864 barf("unblockThread (MVAR): TSO not found");
2867 case BlockedOnBlackHole:
2868 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2870 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2872 last = &bq->blocking_queue;
2873 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2874 last = &t->link, t = t->link) {
2880 barf("unblockThread (BLACKHOLE): TSO not found");
2883 case BlockedOnException:
2885 StgTSO *target = tso->block_info.tso;
2887 ASSERT(get_itbl(target)->type == TSO);
2889 while (target->what_next == ThreadRelocated) {
2890 target = target->link;
2891 ASSERT(get_itbl(target)->type == TSO);
2894 ASSERT(target->blocked_exceptions != NULL);
2896 last = &target->blocked_exceptions;
2897 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2898 last = &t->link, t = t->link) {
2899 ASSERT(get_itbl(t)->type == TSO);
2905 barf("unblockThread (Exception): TSO not found");
2909 case BlockedOnWrite:
2910 #if defined(mingw32_TARGET_OS)
2911 case BlockedOnDoProc:
2914 StgTSO *prev = NULL;
2915 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2916 prev = t, t = t->link) {
2919 blocked_queue_hd = t->link;
2920 if (blocked_queue_tl == t) {
2921 blocked_queue_tl = END_TSO_QUEUE;
2924 prev->link = t->link;
2925 if (blocked_queue_tl == t) {
2926 blocked_queue_tl = prev;
2932 barf("unblockThread (I/O): TSO not found");
2935 case BlockedOnDelay:
2937 StgTSO *prev = NULL;
2938 for (t = sleeping_queue; t != END_TSO_QUEUE;
2939 prev = t, t = t->link) {
2942 sleeping_queue = t->link;
2944 prev->link = t->link;
2949 barf("unblockThread (delay): TSO not found");
2953 barf("unblockThread");
2957 tso->link = END_TSO_QUEUE;
2958 tso->why_blocked = NotBlocked;
2959 tso->block_info.closure = NULL;
2960 PUSH_ON_RUN_QUEUE(tso);
2964 /* -----------------------------------------------------------------------------
2967 * The following function implements the magic for raising an
2968 * asynchronous exception in an existing thread.
2970 * We first remove the thread from any queue on which it might be
2971 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2973 * We strip the stack down to the innermost CATCH_FRAME, building
2974 * thunks in the heap for all the active computations, so they can
2975 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2976 * an application of the handler to the exception, and push it on
2977 * the top of the stack.
2979 * How exactly do we save all the active computations? We create an
2980 * AP_STACK for every UpdateFrame on the stack. Entering one of these
2981 * AP_STACKs pushes everything from the corresponding update frame
2982 * upwards onto the stack. (Actually, it pushes everything up to the
2983 * next update frame plus a pointer to the next AP_STACK object.
2984 * Entering the next AP_STACK object pushes more onto the stack until we
2985 * reach the last AP_STACK object - at which point the stack should look
2986 * exactly as it did when we killed the TSO and we can continue
2987 * execution by entering the closure on top of the stack.
2989 * We can also kill a thread entirely - this happens if either (a) the
2990 * exception passed to raiseAsync is NULL, or (b) there's no
2991 * CATCH_FRAME on the stack. In either case, we strip the entire
2992 * stack and replace the thread with a zombie.
2994 * Locks: sched_mutex held upon entry nor exit.
2996 * -------------------------------------------------------------------------- */
2999 deleteThread(StgTSO *tso)
3001 raiseAsync(tso,NULL);
3005 deleteThreadImmediately(StgTSO *tso)
3006 { // for forkProcess only:
3007 // delete thread without giving it a chance to catch the KillThread exception
3009 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3012 #if defined(RTS_SUPPORTS_THREADS)
3013 if (tso->why_blocked != BlockedOnCCall
3014 && tso->why_blocked != BlockedOnCCall_NoUnblockExc)
3017 tso->what_next = ThreadKilled;
3021 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3023 /* When raising async exs from contexts where sched_mutex isn't held;
3024 use raiseAsyncWithLock(). */
3025 ACQUIRE_LOCK(&sched_mutex);
3026 raiseAsync(tso,exception);
3027 RELEASE_LOCK(&sched_mutex);
3031 raiseAsync(StgTSO *tso, StgClosure *exception)
3033 StgRetInfoTable *info;
3036 // Thread already dead?
3037 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3042 sched_belch("raising exception in thread %ld.", tso->id));
3044 // Remove it from any blocking queues
3049 // The stack freezing code assumes there's a closure pointer on
3050 // the top of the stack, so we have to arrange that this is the case...
3052 if (sp[0] == (W_)&stg_enter_info) {
3056 sp[0] = (W_)&stg_dummy_ret_closure;
3062 // 1. Let the top of the stack be the "current closure"
3064 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3067 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3068 // current closure applied to the chunk of stack up to (but not
3069 // including) the update frame. This closure becomes the "current
3070 // closure". Go back to step 2.
3072 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3073 // top of the stack applied to the exception.
3075 // 5. If it's a STOP_FRAME, then kill the thread.
3080 info = get_ret_itbl((StgClosure *)frame);
3082 while (info->i.type != UPDATE_FRAME
3083 && (info->i.type != CATCH_FRAME || exception == NULL)
3084 && info->i.type != STOP_FRAME) {
3085 frame += stack_frame_sizeW((StgClosure *)frame);
3086 info = get_ret_itbl((StgClosure *)frame);
3089 switch (info->i.type) {
3092 // If we find a CATCH_FRAME, and we've got an exception to raise,
3093 // then build the THUNK raise(exception), and leave it on
3094 // top of the CATCH_FRAME ready to enter.
3098 StgCatchFrame *cf = (StgCatchFrame *)frame;
3102 // we've got an exception to raise, so let's pass it to the
3103 // handler in this frame.
3105 raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3106 TICK_ALLOC_SE_THK(1,0);
3107 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3108 raise->payload[0] = exception;
3110 // throw away the stack from Sp up to the CATCH_FRAME.
3114 /* Ensure that async excpetions are blocked now, so we don't get
3115 * a surprise exception before we get around to executing the
3118 if (tso->blocked_exceptions == NULL) {
3119 tso->blocked_exceptions = END_TSO_QUEUE;
3122 /* Put the newly-built THUNK on top of the stack, ready to execute
3123 * when the thread restarts.
3126 sp[-1] = (W_)&stg_enter_info;
3128 tso->what_next = ThreadRunGHC;
3129 IF_DEBUG(sanity, checkTSO(tso));
3138 // First build an AP_STACK consisting of the stack chunk above the
3139 // current update frame, with the top word on the stack as the
3142 words = frame - sp - 1;
3143 ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3146 ap->fun = (StgClosure *)sp[0];
3148 for(i=0; i < (nat)words; ++i) {
3149 ap->payload[i] = (StgClosure *)*sp++;
3152 SET_HDR(ap,&stg_AP_STACK_info,
3153 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3154 TICK_ALLOC_UP_THK(words+1,0);
3157 fprintf(stderr, "sched: Updating ");
3158 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3159 fprintf(stderr, " with ");
3160 printObj((StgClosure *)ap);
3163 // Replace the updatee with an indirection - happily
3164 // this will also wake up any threads currently
3165 // waiting on the result.
3167 // Warning: if we're in a loop, more than one update frame on
3168 // the stack may point to the same object. Be careful not to
3169 // overwrite an IND_OLDGEN in this case, because we'll screw
3170 // up the mutable lists. To be on the safe side, don't
3171 // overwrite any kind of indirection at all. See also
3172 // threadSqueezeStack in GC.c, where we have to make a similar
3175 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3176 // revert the black hole
3177 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3179 sp += sizeofW(StgUpdateFrame) - 1;
3180 sp[0] = (W_)ap; // push onto stack
3185 // We've stripped the entire stack, the thread is now dead.
3186 sp += sizeofW(StgStopFrame);
3187 tso->what_next = ThreadKilled;
3198 /* -----------------------------------------------------------------------------
3199 resurrectThreads is called after garbage collection on the list of
3200 threads found to be garbage. Each of these threads will be woken
3201 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3202 on an MVar, or NonTermination if the thread was blocked on a Black
3205 Locks: sched_mutex isn't held upon entry nor exit.
3206 -------------------------------------------------------------------------- */
3209 resurrectThreads( StgTSO *threads )
3213 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3214 next = tso->global_link;
3215 tso->global_link = all_threads;
3217 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3219 switch (tso->why_blocked) {
3221 case BlockedOnException:
3222 /* Called by GC - sched_mutex lock is currently held. */
3223 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3225 case BlockedOnBlackHole:
3226 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3229 /* This might happen if the thread was blocked on a black hole
3230 * belonging to a thread that we've just woken up (raiseAsync
3231 * can wake up threads, remember...).
3235 barf("resurrectThreads: thread blocked in a strange way");
3240 /* -----------------------------------------------------------------------------
3241 * Blackhole detection: if we reach a deadlock, test whether any
3242 * threads are blocked on themselves. Any threads which are found to
3243 * be self-blocked get sent a NonTermination exception.
3245 * This is only done in a deadlock situation in order to avoid
3246 * performance overhead in the normal case.
3248 * Locks: sched_mutex is held upon entry and exit.
3249 * -------------------------------------------------------------------------- */
3252 detectBlackHoles( void )
3254 StgTSO *tso = all_threads;
3256 StgClosure *blocked_on;
3257 StgRetInfoTable *info;
3259 for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3261 while (tso->what_next == ThreadRelocated) {
3263 ASSERT(get_itbl(tso)->type == TSO);
3266 if (tso->why_blocked != BlockedOnBlackHole) {
3269 blocked_on = tso->block_info.closure;
3271 frame = (StgClosure *)tso->sp;
3274 info = get_ret_itbl(frame);
3275 switch (info->i.type) {
3277 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3278 /* We are blocking on one of our own computations, so
3279 * send this thread the NonTermination exception.
3282 sched_belch("thread %d is blocked on itself", tso->id));
3283 raiseAsync(tso, (StgClosure *)NonTermination_closure);
3287 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3293 // normal stack frames; do nothing except advance the pointer
3295 (StgPtr)frame += stack_frame_sizeW(frame);
3302 /* ----------------------------------------------------------------------------
3303 * Debugging: why is a thread blocked
3304 * [Also provides useful information when debugging threaded programs
3305 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3306 ------------------------------------------------------------------------- */
3310 printThreadBlockage(StgTSO *tso)
3312 switch (tso->why_blocked) {
3314 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3316 case BlockedOnWrite:
3317 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3319 #if defined(mingw32_TARGET_OS)
3320 case BlockedOnDoProc:
3321 fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3324 case BlockedOnDelay:
3325 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3328 fprintf(stderr,"is blocked on an MVar");
3330 case BlockedOnException:
3331 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3332 tso->block_info.tso->id);
3334 case BlockedOnBlackHole:
3335 fprintf(stderr,"is blocked on a black hole");
3338 fprintf(stderr,"is not blocked");
3342 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3343 tso->block_info.closure, info_type(tso->block_info.closure));
3345 case BlockedOnGA_NoSend:
3346 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3347 tso->block_info.closure, info_type(tso->block_info.closure));
3350 #if defined(RTS_SUPPORTS_THREADS)
3351 case BlockedOnCCall:
3352 fprintf(stderr,"is blocked on an external call");
3354 case BlockedOnCCall_NoUnblockExc:
3355 fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3359 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3360 tso->why_blocked, tso->id, tso);
3366 printThreadStatus(StgTSO *tso)
3368 switch (tso->what_next) {
3370 fprintf(stderr,"has been killed");
3372 case ThreadComplete:
3373 fprintf(stderr,"has completed");
3376 printThreadBlockage(tso);
3381 printAllThreads(void)
3387 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3388 ullong_format_string(TIME_ON_PROC(CurrentProc),
3389 time_string, rtsFalse/*no commas!*/);
3391 fprintf(stderr, "all threads at [%s]:\n", time_string);
3393 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3394 ullong_format_string(CURRENT_TIME,
3395 time_string, rtsFalse/*no commas!*/);
3397 fprintf(stderr,"all threads at [%s]:\n", time_string);
3399 fprintf(stderr,"all threads:\n");
3402 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3403 fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3404 label = lookupThreadLabel(t->id);
3405 if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3406 printThreadStatus(t);
3407 fprintf(stderr,"\n");
3414 Print a whole blocking queue attached to node (debugging only).
3418 print_bq (StgClosure *node)
3420 StgBlockingQueueElement *bqe;
3424 fprintf(stderr,"## BQ of closure %p (%s): ",
3425 node, info_type(node));
3427 /* should cover all closures that may have a blocking queue */
3428 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3429 get_itbl(node)->type == FETCH_ME_BQ ||
3430 get_itbl(node)->type == RBH ||
3431 get_itbl(node)->type == MVAR);
3433 ASSERT(node!=(StgClosure*)NULL); // sanity check
3435 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3439 Print a whole blocking queue starting with the element bqe.
3442 print_bqe (StgBlockingQueueElement *bqe)
3447 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3449 for (end = (bqe==END_BQ_QUEUE);
3450 !end; // iterate until bqe points to a CONSTR
3451 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3452 bqe = end ? END_BQ_QUEUE : bqe->link) {
3453 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3454 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3455 /* types of closures that may appear in a blocking queue */
3456 ASSERT(get_itbl(bqe)->type == TSO ||
3457 get_itbl(bqe)->type == BLOCKED_FETCH ||
3458 get_itbl(bqe)->type == CONSTR);
3459 /* only BQs of an RBH end with an RBH_Save closure */
3460 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3462 switch (get_itbl(bqe)->type) {
3464 fprintf(stderr," TSO %u (%x),",
3465 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3468 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3469 ((StgBlockedFetch *)bqe)->node,
3470 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3471 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3472 ((StgBlockedFetch *)bqe)->ga.weight);
3475 fprintf(stderr," %s (IP %p),",
3476 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3477 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3478 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3479 "RBH_Save_?"), get_itbl(bqe));
3482 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3483 info_type((StgClosure *)bqe)); // , node, info_type(node));
3487 fputc('\n', stderr);
3489 # elif defined(GRAN)
3491 print_bq (StgClosure *node)
3493 StgBlockingQueueElement *bqe;
3494 PEs node_loc, tso_loc;
3497 /* should cover all closures that may have a blocking queue */
3498 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3499 get_itbl(node)->type == FETCH_ME_BQ ||
3500 get_itbl(node)->type == RBH);
3502 ASSERT(node!=(StgClosure*)NULL); // sanity check
3503 node_loc = where_is(node);
3505 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3506 node, info_type(node), node_loc);
3509 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3511 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3512 !end; // iterate until bqe points to a CONSTR
3513 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3514 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3515 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3516 /* types of closures that may appear in a blocking queue */
3517 ASSERT(get_itbl(bqe)->type == TSO ||
3518 get_itbl(bqe)->type == CONSTR);
3519 /* only BQs of an RBH end with an RBH_Save closure */
3520 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3522 tso_loc = where_is((StgClosure *)bqe);
3523 switch (get_itbl(bqe)->type) {
3525 fprintf(stderr," TSO %d (%p) on [PE %d],",
3526 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3529 fprintf(stderr," %s (IP %p),",
3530 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3531 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3532 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3533 "RBH_Save_?"), get_itbl(bqe));
3536 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3537 info_type((StgClosure *)bqe), node, info_type(node));
3541 fputc('\n', stderr);
3545 Nice and easy: only TSOs on the blocking queue
3548 print_bq (StgClosure *node)
3552 ASSERT(node!=(StgClosure*)NULL); // sanity check
3553 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3554 tso != END_TSO_QUEUE;
3556 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3557 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3558 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3560 fputc('\n', stderr);
3571 for (i=0, tso=run_queue_hd;
3572 tso != END_TSO_QUEUE;
3581 sched_belch(char *s, ...)
3585 #ifdef RTS_SUPPORTS_THREADS
3586 fprintf(stderr, "sched (task %p): ", osThreadId());
3588 fprintf(stderr, "== ");
3590 fprintf(stderr, "sched: ");
3592 vfprintf(stderr, s, ap);
3593 fprintf(stderr, "\n");