1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.186 2004/02/26 11:41:22 simonmar Exp $
4 * (c) The GHC Team, 1998-2003
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distrib. memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
18 * --------------------------------------------------------------------------*/
21 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
23 The main scheduling loop in GUM iterates until a finish message is received.
24 In that case a global flag @receivedFinish@ is set and this instance of
25 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
26 for the handling of incoming messages, such as PP_FINISH.
27 Note that in the parallel case we have a system manager that coordinates
28 different PEs, each of which are running one instance of the RTS.
29 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
30 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
32 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
34 The main scheduling code in GranSim is quite different from that in std
35 (concurrent) Haskell: while concurrent Haskell just iterates over the
36 threads in the runnable queue, GranSim is event driven, i.e. it iterates
37 over the events in the global event queue. -- HWL
40 #include "PosixSource.h"
47 #include "StgStartup.h"
49 #define COMPILING_SCHEDULER
51 #include "StgMiscClosures.h"
53 #include "Interpreter.h"
54 #include "Exception.h"
61 #include "ThreadLabels.h"
63 #include "Proftimer.h"
66 #if defined(GRAN) || defined(PAR)
67 # include "GranSimRts.h"
69 # include "ParallelRts.h"
70 # include "Parallel.h"
71 # include "ParallelDebug.h"
76 #include "Capability.h"
77 #include "OSThreads.h"
80 #ifdef HAVE_SYS_TYPES_H
81 #include <sys/types.h>
96 #define USED_IN_THREADED_RTS
98 #define USED_IN_THREADED_RTS STG_UNUSED
101 #ifdef RTS_SUPPORTS_THREADS
102 #define USED_WHEN_RTS_SUPPORTS_THREADS
104 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
107 /* Main thread queue.
108 * Locks required: sched_mutex.
110 StgMainThread *main_threads = NULL;
113 * Locks required: sched_mutex.
117 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
118 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
121 In GranSim we have a runnable and a blocked queue for each processor.
122 In order to minimise code changes new arrays run_queue_hds/tls
123 are created. run_queue_hd is then a short cut (macro) for
124 run_queue_hds[CurrentProc] (see GranSim.h).
127 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
128 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
129 StgTSO *ccalling_threadss[MAX_PROC];
130 /* We use the same global list of threads (all_threads) in GranSim as in
131 the std RTS (i.e. we are cheating). However, we don't use this list in
132 the GranSim specific code at the moment (so we are only potentially
137 StgTSO *run_queue_hd = NULL;
138 StgTSO *run_queue_tl = NULL;
139 StgTSO *blocked_queue_hd = NULL;
140 StgTSO *blocked_queue_tl = NULL;
141 StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */
145 /* Linked list of all threads.
146 * Used for detecting garbage collected threads.
148 StgTSO *all_threads = NULL;
150 /* When a thread performs a safe C call (_ccall_GC, using old
151 * terminology), it gets put on the suspended_ccalling_threads
152 * list. Used by the garbage collector.
154 static StgTSO *suspended_ccalling_threads;
156 static StgTSO *threadStackOverflow(StgTSO *tso);
158 /* KH: The following two flags are shared memory locations. There is no need
159 to lock them, since they are only unset at the end of a scheduler
163 /* flag set by signal handler to precipitate a context switch */
164 nat context_switch = 0;
166 /* if this flag is set as well, give up execution */
167 rtsBool interrupted = rtsFalse;
169 /* Next thread ID to allocate.
170 * Locks required: thread_id_mutex
172 static StgThreadID next_thread_id = 1;
175 * Pointers to the state of the current thread.
176 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
177 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
180 /* The smallest stack size that makes any sense is:
181 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
182 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
183 * + 1 (the closure to enter)
185 * + 1 (spare slot req'd by stg_ap_v_ret)
187 * A thread with this stack will bomb immediately with a stack
188 * overflow, which will increase its stack size.
191 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
198 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
199 * exists - earlier gccs apparently didn't.
204 static rtsBool ready_to_gc;
207 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
208 * in an MT setting, needed to signal that a worker thread shouldn't hang around
209 * in the scheduler when it is out of work.
211 static rtsBool shutting_down_scheduler = rtsFalse;
213 void addToBlockedQueue ( StgTSO *tso );
215 static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
216 void interruptStgRts ( void );
218 static void detectBlackHoles ( void );
220 #if defined(RTS_SUPPORTS_THREADS)
221 /* ToDo: carefully document the invariants that go together
222 * with these synchronisation objects.
224 Mutex sched_mutex = INIT_MUTEX_VAR;
225 Mutex term_mutex = INIT_MUTEX_VAR;
228 * A heavyweight solution to the problem of protecting
229 * the thread_id from concurrent update.
231 Mutex thread_id_mutex = INIT_MUTEX_VAR;
233 #endif /* RTS_SUPPORTS_THREADS */
237 rtsTime TimeOfLastYield;
238 rtsBool emitSchedule = rtsTrue;
242 static char *whatNext_strs[] = {
252 StgTSO * createSparkThread(rtsSpark spark);
253 StgTSO * activateSpark (rtsSpark spark);
256 /* ----------------------------------------------------------------------------
258 * ------------------------------------------------------------------------- */
260 #if defined(RTS_SUPPORTS_THREADS)
261 static rtsBool startingWorkerThread = rtsFalse;
263 static void taskStart(void);
267 ACQUIRE_LOCK(&sched_mutex);
269 RELEASE_LOCK(&sched_mutex);
273 startSchedulerTaskIfNecessary(void)
275 if(run_queue_hd != END_TSO_QUEUE
276 || blocked_queue_hd != END_TSO_QUEUE
277 || sleeping_queue != END_TSO_QUEUE)
279 if(!startingWorkerThread)
280 { // we don't want to start another worker thread
281 // just because the last one hasn't yet reached the
282 // "waiting for capability" state
283 startingWorkerThread = rtsTrue;
284 startTask(taskStart);
290 /* ---------------------------------------------------------------------------
291 Main scheduling loop.
293 We use round-robin scheduling, each thread returning to the
294 scheduler loop when one of these conditions is detected:
297 * timer expires (thread yields)
302 Locking notes: we acquire the scheduler lock once at the beginning
303 of the scheduler loop, and release it when
305 * running a thread, or
306 * waiting for work, or
307 * waiting for a GC to complete.
310 In a GranSim setup this loop iterates over the global event queue.
311 This revolves around the global event queue, which determines what
312 to do next. Therefore, it's more complicated than either the
313 concurrent or the parallel (GUM) setup.
316 GUM iterates over incoming messages.
317 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
318 and sends out a fish whenever it has nothing to do; in-between
319 doing the actual reductions (shared code below) it processes the
320 incoming messages and deals with delayed operations
321 (see PendingFetches).
322 This is not the ugliest code you could imagine, but it's bloody close.
324 ------------------------------------------------------------------------ */
326 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
327 Capability *initialCapability )
330 Capability *cap = initialCapability;
331 StgThreadReturnCode ret;
339 rtsBool receivedFinish = rtsFalse;
341 nat tp_size, sp_size; // stats only
344 rtsBool was_interrupted = rtsFalse;
345 StgTSOWhatNext prev_what_next;
347 // Pre-condition: sched_mutex is held.
349 #if defined(RTS_SUPPORTS_THREADS)
351 // in the threaded case, the capability is either passed in via the
352 // initialCapability parameter, or initialized inside the scheduler
356 sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
357 mainThread, initialCapability);
360 // simply initialise it in the non-threaded case
361 grabCapability(&cap);
365 /* set up first event to get things going */
366 /* ToDo: assign costs for system setup and init MainTSO ! */
367 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
369 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
372 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
373 G_TSO(CurrentTSO, 5));
375 if (RtsFlags.GranFlags.Light) {
376 /* Save current time; GranSim Light only */
377 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
380 event = get_next_event();
382 while (event!=(rtsEvent*)NULL) {
383 /* Choose the processor with the next event */
384 CurrentProc = event->proc;
385 CurrentTSO = event->tso;
389 while (!receivedFinish) { /* set by processMessages */
390 /* when receiving PP_FINISH message */
392 #else // everything except GRAN and PAR
398 IF_DEBUG(scheduler, printAllThreads());
400 #if defined(RTS_SUPPORTS_THREADS)
401 // Yield the capability to higher-priority tasks if necessary.
404 yieldCapability(&cap);
407 // If we do not currently hold a capability, we wait for one
410 waitForCapability(&sched_mutex, &cap,
411 mainThread ? &mainThread->bound_thread_cond : NULL);
414 // We now have a capability...
418 // If we're interrupted (the user pressed ^C, or some other
419 // termination condition occurred), kill all the currently running
423 IF_DEBUG(scheduler, sched_belch("interrupted"));
424 interrupted = rtsFalse;
425 was_interrupted = rtsTrue;
426 #if defined(RTS_SUPPORTS_THREADS)
427 // In the threaded RTS, deadlock detection doesn't work,
428 // so just exit right away.
429 prog_belch("interrupted");
430 releaseCapability(cap);
431 RELEASE_LOCK(&sched_mutex);
432 shutdownHaskellAndExit(EXIT_SUCCESS);
439 // Go through the list of main threads and wake up any
440 // clients whose computations have finished. ToDo: this
441 // should be done more efficiently without a linear scan
442 // of the main threads list, somehow...
444 #if defined(RTS_SUPPORTS_THREADS)
446 StgMainThread *m, **prev;
447 prev = &main_threads;
448 for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
449 if (m->tso->what_next == ThreadComplete
450 || m->tso->what_next == ThreadKilled)
454 if (m->tso->what_next == ThreadComplete)
458 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
459 *(m->ret) = (StgClosure *)m->tso->sp[1];
471 m->stat = Interrupted;
481 removeThreadLabel((StgWord)m->tso->id);
483 releaseCapability(cap);
488 // The current OS thread can not handle the fact that
489 // the Haskell thread "m" has ended. "m" is bound;
490 // the scheduler loop in it's bound OS thread has to
491 // return, so let's pass our capability directly to
493 passCapability(&m->bound_thread_cond);
500 #else /* not threaded */
503 /* in GUM do this only on the Main PE */
506 /* If our main thread has finished or been killed, return.
509 StgMainThread *m = main_threads;
510 if (m->tso->what_next == ThreadComplete
511 || m->tso->what_next == ThreadKilled) {
513 removeThreadLabel((StgWord)m->tso->id);
515 main_threads = main_threads->link;
516 if (m->tso->what_next == ThreadComplete) {
517 // We finished successfully, fill in the return value
518 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
519 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
523 if (m->ret) { *(m->ret) = NULL; };
524 if (was_interrupted) {
525 m->stat = Interrupted;
536 #if defined(RTS_USER_SIGNALS)
537 // check for signals each time around the scheduler
538 if (signals_pending()) {
539 RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
540 startSignalHandlers();
541 ACQUIRE_LOCK(&sched_mutex);
545 /* Check whether any waiting threads need to be woken up. If the
546 * run queue is empty, and there are no other tasks running, we
547 * can wait indefinitely for something to happen.
549 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
550 #if defined(RTS_SUPPORTS_THREADS)
555 awaitEvent( EMPTY_RUN_QUEUE() );
557 /* we can be interrupted while waiting for I/O... */
558 if (interrupted) continue;
561 * Detect deadlock: when we have no threads to run, there are no
562 * threads waiting on I/O or sleeping, and all the other tasks are
563 * waiting for work, we must have a deadlock of some description.
565 * We first try to find threads blocked on themselves (ie. black
566 * holes), and generate NonTermination exceptions where necessary.
568 * If no threads are black holed, we have a deadlock situation, so
569 * inform all the main threads.
571 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
572 if ( EMPTY_THREAD_QUEUES() )
574 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
575 // Garbage collection can release some new threads due to
576 // either (a) finalizers or (b) threads resurrected because
577 // they are about to be send BlockedOnDeadMVar. Any threads
578 // thus released will be immediately runnable.
579 GarbageCollect(GetRoots,rtsTrue);
581 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
584 sched_belch("still deadlocked, checking for black holes..."));
587 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
589 #if defined(RTS_USER_SIGNALS)
590 /* If we have user-installed signal handlers, then wait
591 * for signals to arrive rather then bombing out with a
594 if ( anyUserHandlers() ) {
596 sched_belch("still deadlocked, waiting for signals..."));
600 // we might be interrupted...
601 if (interrupted) { continue; }
603 if (signals_pending()) {
604 RELEASE_LOCK(&sched_mutex);
605 startSignalHandlers();
606 ACQUIRE_LOCK(&sched_mutex);
608 ASSERT(!EMPTY_RUN_QUEUE());
613 /* Probably a real deadlock. Send the current main thread the
614 * Deadlock exception (or in the SMP build, send *all* main
615 * threads the deadlock exception, since none of them can make
621 switch (m->tso->why_blocked) {
622 case BlockedOnBlackHole:
623 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
625 case BlockedOnException:
627 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
630 barf("deadlock: main thread blocked in a strange way");
636 #elif defined(RTS_SUPPORTS_THREADS)
637 // ToDo: add deadlock detection in threaded RTS
639 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
642 #if defined(RTS_SUPPORTS_THREADS)
643 if ( EMPTY_RUN_QUEUE() ) {
644 continue; // nothing to do
649 if (RtsFlags.GranFlags.Light)
650 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
652 /* adjust time based on time-stamp */
653 if (event->time > CurrentTime[CurrentProc] &&
654 event->evttype != ContinueThread)
655 CurrentTime[CurrentProc] = event->time;
657 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
658 if (!RtsFlags.GranFlags.Light)
661 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
663 /* main event dispatcher in GranSim */
664 switch (event->evttype) {
665 /* Should just be continuing execution */
667 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
668 /* ToDo: check assertion
669 ASSERT(run_queue_hd != (StgTSO*)NULL &&
670 run_queue_hd != END_TSO_QUEUE);
672 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
673 if (!RtsFlags.GranFlags.DoAsyncFetch &&
674 procStatus[CurrentProc]==Fetching) {
675 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
676 CurrentTSO->id, CurrentTSO, CurrentProc);
679 /* Ignore ContinueThreads for completed threads */
680 if (CurrentTSO->what_next == ThreadComplete) {
681 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
682 CurrentTSO->id, CurrentTSO, CurrentProc);
685 /* Ignore ContinueThreads for threads that are being migrated */
686 if (PROCS(CurrentTSO)==Nowhere) {
687 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
688 CurrentTSO->id, CurrentTSO, CurrentProc);
691 /* The thread should be at the beginning of the run queue */
692 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
693 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
694 CurrentTSO->id, CurrentTSO, CurrentProc);
695 break; // run the thread anyway
698 new_event(proc, proc, CurrentTime[proc],
700 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
702 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
703 break; // now actually run the thread; DaH Qu'vam yImuHbej
706 do_the_fetchnode(event);
707 goto next_thread; /* handle next event in event queue */
710 do_the_globalblock(event);
711 goto next_thread; /* handle next event in event queue */
714 do_the_fetchreply(event);
715 goto next_thread; /* handle next event in event queue */
717 case UnblockThread: /* Move from the blocked queue to the tail of */
718 do_the_unblock(event);
719 goto next_thread; /* handle next event in event queue */
721 case ResumeThread: /* Move from the blocked queue to the tail of */
722 /* the runnable queue ( i.e. Qu' SImqa'lu') */
723 event->tso->gran.blocktime +=
724 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
725 do_the_startthread(event);
726 goto next_thread; /* handle next event in event queue */
729 do_the_startthread(event);
730 goto next_thread; /* handle next event in event queue */
733 do_the_movethread(event);
734 goto next_thread; /* handle next event in event queue */
737 do_the_movespark(event);
738 goto next_thread; /* handle next event in event queue */
741 do_the_findwork(event);
742 goto next_thread; /* handle next event in event queue */
745 barf("Illegal event type %u\n", event->evttype);
748 /* This point was scheduler_loop in the old RTS */
750 IF_DEBUG(gran, belch("GRAN: after main switch"));
752 TimeOfLastEvent = CurrentTime[CurrentProc];
753 TimeOfNextEvent = get_time_of_next_event();
754 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
755 // CurrentTSO = ThreadQueueHd;
757 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
760 if (RtsFlags.GranFlags.Light)
761 GranSimLight_leave_system(event, &ActiveTSO);
763 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
766 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
768 /* in a GranSim setup the TSO stays on the run queue */
770 /* Take a thread from the run queue. */
771 POP_RUN_QUEUE(t); // take_off_run_queue(t);
774 fprintf(stderr, "GRAN: About to run current thread, which is\n");
777 context_switch = 0; // turned on via GranYield, checking events and time slice
780 DumpGranEvent(GR_SCHEDULE, t));
782 procStatus[CurrentProc] = Busy;
785 if (PendingFetches != END_BF_QUEUE) {
789 /* ToDo: phps merge with spark activation above */
790 /* check whether we have local work and send requests if we have none */
791 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
792 /* :-[ no local threads => look out for local sparks */
793 /* the spark pool for the current PE */
794 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
795 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
796 pool->hd < pool->tl) {
798 * ToDo: add GC code check that we really have enough heap afterwards!!
800 * If we're here (no runnable threads) and we have pending
801 * sparks, we must have a space problem. Get enough space
802 * to turn one of those pending sparks into a
806 spark = findSpark(rtsFalse); /* get a spark */
807 if (spark != (rtsSpark) NULL) {
808 tso = activateSpark(spark); /* turn the spark into a thread */
809 IF_PAR_DEBUG(schedule,
810 belch("==== schedule: Created TSO %d (%p); %d threads active",
811 tso->id, tso, advisory_thread_count));
813 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
814 belch("==^^ failed to activate spark");
816 } /* otherwise fall through & pick-up new tso */
818 IF_PAR_DEBUG(verbose,
819 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
820 spark_queue_len(pool)));
825 /* If we still have no work we need to send a FISH to get a spark
828 if (EMPTY_RUN_QUEUE()) {
829 /* =8-[ no local sparks => look for work on other PEs */
831 * We really have absolutely no work. Send out a fish
832 * (there may be some out there already), and wait for
833 * something to arrive. We clearly can't run any threads
834 * until a SCHEDULE or RESUME arrives, and so that's what
835 * we're hoping to see. (Of course, we still have to
836 * respond to other types of messages.)
838 TIME now = msTime() /*CURRENT_TIME*/;
839 IF_PAR_DEBUG(verbose,
840 belch("-- now=%ld", now));
841 IF_PAR_DEBUG(verbose,
842 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
843 (last_fish_arrived_at!=0 &&
844 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
845 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
846 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
847 last_fish_arrived_at,
848 RtsFlags.ParFlags.fishDelay, now);
851 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
852 (last_fish_arrived_at==0 ||
853 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
854 /* outstandingFishes is set in sendFish, processFish;
855 avoid flooding system with fishes via delay */
857 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
860 // Global statistics: count no. of fishes
861 if (RtsFlags.ParFlags.ParStats.Global &&
862 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
863 globalParStats.tot_fish_mess++;
867 receivedFinish = processMessages();
870 } else if (PacketsWaiting()) { /* Look for incoming messages */
871 receivedFinish = processMessages();
874 /* Now we are sure that we have some work available */
875 ASSERT(run_queue_hd != END_TSO_QUEUE);
877 /* Take a thread from the run queue, if we have work */
878 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
879 IF_DEBUG(sanity,checkTSO(t));
881 /* ToDo: write something to the log-file
882 if (RTSflags.ParFlags.granSimStats && !sameThread)
883 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
887 /* the spark pool for the current PE */
888 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
891 belch("--=^ %d threads, %d sparks on [%#x]",
892 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
895 if (0 && RtsFlags.ParFlags.ParStats.Full &&
896 t && LastTSO && t->id != LastTSO->id &&
897 LastTSO->why_blocked == NotBlocked &&
898 LastTSO->what_next != ThreadComplete) {
899 // if previously scheduled TSO not blocked we have to record the context switch
900 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
901 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
904 if (RtsFlags.ParFlags.ParStats.Full &&
905 (emitSchedule /* forced emit */ ||
906 (t && LastTSO && t->id != LastTSO->id))) {
908 we are running a different TSO, so write a schedule event to log file
909 NB: If we use fair scheduling we also have to write a deschedule
910 event for LastTSO; with unfair scheduling we know that the
911 previous tso has blocked whenever we switch to another tso, so
912 we don't need it in GUM for now
914 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
915 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
916 emitSchedule = rtsFalse;
920 #else /* !GRAN && !PAR */
922 // grab a thread from the run queue
923 ASSERT(run_queue_hd != END_TSO_QUEUE);
926 // Sanity check the thread we're about to run. This can be
927 // expensive if there is lots of thread switching going on...
928 IF_DEBUG(sanity,checkTSO(t));
934 for(m = main_threads; m; m = m->link)
945 sched_belch("### Running thread %d in bound thread", t->id));
946 // yes, the Haskell thread is bound to the current native thread
951 sched_belch("### thread %d bound to another OS thread", t->id));
952 // no, bound to a different Haskell thread: pass to that thread
953 PUSH_ON_RUN_QUEUE(t);
954 passCapability(&m->bound_thread_cond);
960 if(mainThread != NULL)
961 // The thread we want to run is bound.
964 sched_belch("### this OS thread cannot run thread %d", t->id));
965 // no, the current native thread is bound to a different
966 // Haskell thread, so pass it to any worker thread
967 PUSH_ON_RUN_QUEUE(t);
968 passCapabilityToWorker();
975 cap->r.rCurrentTSO = t;
977 /* context switches are now initiated by the timer signal, unless
978 * the user specified "context switch as often as possible", with
981 if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
982 && (run_queue_hd != END_TSO_QUEUE
983 || blocked_queue_hd != END_TSO_QUEUE
984 || sleeping_queue != END_TSO_QUEUE)))
991 RELEASE_LOCK(&sched_mutex);
993 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
994 t->id, whatNext_strs[t->what_next]));
997 startHeapProfTimer();
1000 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1001 /* Run the current thread
1003 prev_what_next = t->what_next;
1004 switch (prev_what_next) {
1006 case ThreadComplete:
1007 /* Thread already finished, return to scheduler. */
1008 ret = ThreadFinished;
1011 errno = t->saved_errno;
1012 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1013 t->saved_errno = errno;
1015 case ThreadInterpret:
1016 ret = interpretBCO(cap);
1019 barf("schedule: invalid what_next field");
1021 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1023 /* Costs for the scheduler are assigned to CCS_SYSTEM */
1025 stopHeapProfTimer();
1029 ACQUIRE_LOCK(&sched_mutex);
1031 #ifdef RTS_SUPPORTS_THREADS
1032 IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
1033 #elif !defined(GRAN) && !defined(PAR)
1034 IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
1036 t = cap->r.rCurrentTSO;
1039 /* HACK 675: if the last thread didn't yield, make sure to print a
1040 SCHEDULE event to the log file when StgRunning the next thread, even
1041 if it is the same one as before */
1043 TimeOfLastYield = CURRENT_TIME;
1049 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1050 globalGranStats.tot_heapover++;
1052 globalParStats.tot_heapover++;
1055 // did the task ask for a large block?
1056 if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1057 // if so, get one and push it on the front of the nursery.
1061 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1063 IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)",
1064 t->id, whatNext_strs[t->what_next], blocks));
1066 // don't do this if it would push us over the
1067 // alloc_blocks_lim limit; we'll GC first.
1068 if (alloc_blocks + blocks < alloc_blocks_lim) {
1070 alloc_blocks += blocks;
1071 bd = allocGroup( blocks );
1073 // link the new group into the list
1074 bd->link = cap->r.rCurrentNursery;
1075 bd->u.back = cap->r.rCurrentNursery->u.back;
1076 if (cap->r.rCurrentNursery->u.back != NULL) {
1077 cap->r.rCurrentNursery->u.back->link = bd;
1079 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1080 g0s0->blocks == cap->r.rNursery);
1081 cap->r.rNursery = g0s0->blocks = bd;
1083 cap->r.rCurrentNursery->u.back = bd;
1085 // initialise it as a nursery block. We initialise the
1086 // step, gen_no, and flags field of *every* sub-block in
1087 // this large block, because this is easier than making
1088 // sure that we always find the block head of a large
1089 // block whenever we call Bdescr() (eg. evacuate() and
1090 // isAlive() in the GC would both have to do this, at
1094 for (x = bd; x < bd + blocks; x++) {
1101 // don't forget to update the block count in g0s0.
1102 g0s0->n_blocks += blocks;
1103 // This assert can be a killer if the app is doing lots
1104 // of large block allocations.
1105 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1107 // now update the nursery to point to the new block
1108 cap->r.rCurrentNursery = bd;
1110 // we might be unlucky and have another thread get on the
1111 // run queue before us and steal the large block, but in that
1112 // case the thread will just end up requesting another large
1114 PUSH_ON_RUN_QUEUE(t);
1119 /* make all the running tasks block on a condition variable,
1120 * maybe set context_switch and wait till they all pile in,
1121 * then have them wait on a GC condition variable.
1123 IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow",
1124 t->id, whatNext_strs[t->what_next]));
1127 ASSERT(!is_on_queue(t,CurrentProc));
1129 /* Currently we emit a DESCHEDULE event before GC in GUM.
1130 ToDo: either add separate event to distinguish SYSTEM time from rest
1131 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1132 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1133 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1134 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1135 emitSchedule = rtsTrue;
1139 ready_to_gc = rtsTrue;
1140 context_switch = 1; /* stop other threads ASAP */
1141 PUSH_ON_RUN_QUEUE(t);
1142 /* actual GC is done at the end of the while loop */
1148 DumpGranEvent(GR_DESCHEDULE, t));
1149 globalGranStats.tot_stackover++;
1152 // DumpGranEvent(GR_DESCHEDULE, t);
1153 globalParStats.tot_stackover++;
1155 IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow",
1156 t->id, whatNext_strs[t->what_next]));
1157 /* just adjust the stack for this thread, then pop it back
1163 /* enlarge the stack */
1164 StgTSO *new_t = threadStackOverflow(t);
1166 /* This TSO has moved, so update any pointers to it from the
1167 * main thread stack. It better not be on any other queues...
1168 * (it shouldn't be).
1170 for (m = main_threads; m != NULL; m = m->link) {
1175 threadPaused(new_t);
1176 PUSH_ON_RUN_QUEUE(new_t);
1180 case ThreadYielding:
1183 DumpGranEvent(GR_DESCHEDULE, t));
1184 globalGranStats.tot_yields++;
1187 // DumpGranEvent(GR_DESCHEDULE, t);
1188 globalParStats.tot_yields++;
1190 /* put the thread back on the run queue. Then, if we're ready to
1191 * GC, check whether this is the last task to stop. If so, wake
1192 * up the GC thread. getThread will block during a GC until the
1196 if (t->what_next != prev_what_next) {
1197 belch("--<< thread %ld (%s) stopped to switch evaluators",
1198 t->id, whatNext_strs[t->what_next]);
1200 belch("--<< thread %ld (%s) stopped, yielding",
1201 t->id, whatNext_strs[t->what_next]);
1206 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1208 ASSERT(t->link == END_TSO_QUEUE);
1210 // Shortcut if we're just switching evaluators: don't bother
1211 // doing stack squeezing (which can be expensive), just run the
1213 if (t->what_next != prev_what_next) {
1220 ASSERT(!is_on_queue(t,CurrentProc));
1223 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1224 checkThreadQsSanity(rtsTrue));
1228 if (RtsFlags.ParFlags.doFairScheduling) {
1229 /* this does round-robin scheduling; good for concurrency */
1230 APPEND_TO_RUN_QUEUE(t);
1232 /* this does unfair scheduling; good for parallelism */
1233 PUSH_ON_RUN_QUEUE(t);
1236 // this does round-robin scheduling; good for concurrency
1237 APPEND_TO_RUN_QUEUE(t);
1241 /* add a ContinueThread event to actually process the thread */
1242 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1244 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1246 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1255 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1256 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1257 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1259 // ??? needed; should emit block before
1261 DumpGranEvent(GR_DESCHEDULE, t));
1262 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1265 ASSERT(procStatus[CurrentProc]==Busy ||
1266 ((procStatus[CurrentProc]==Fetching) &&
1267 (t->block_info.closure!=(StgClosure*)NULL)));
1268 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1269 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1270 procStatus[CurrentProc]==Fetching))
1271 procStatus[CurrentProc] = Idle;
1275 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1276 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1279 if (t->block_info.closure!=(StgClosure*)NULL)
1280 print_bq(t->block_info.closure));
1282 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1285 /* whatever we schedule next, we must log that schedule */
1286 emitSchedule = rtsTrue;
1289 /* don't need to do anything. Either the thread is blocked on
1290 * I/O, in which case we'll have called addToBlockedQueue
1291 * previously, or it's blocked on an MVar or Blackhole, in which
1292 * case it'll be on the relevant queue already.
1295 fprintf(stderr, "--<< thread %d (%s) stopped: ",
1296 t->id, whatNext_strs[t->what_next]);
1297 printThreadBlockage(t);
1298 fprintf(stderr, "\n"));
1301 /* Only for dumping event to log file
1302 ToDo: do I need this in GranSim, too?
1309 case ThreadFinished:
1310 /* Need to check whether this was a main thread, and if so, signal
1311 * the task that started it with the return value. If we have no
1312 * more main threads, we probably need to stop all the tasks until
1315 /* We also end up here if the thread kills itself with an
1316 * uncaught exception, see Exception.hc.
1318 IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished",
1319 t->id, whatNext_strs[t->what_next]));
1321 endThread(t, CurrentProc); // clean-up the thread
1323 /* For now all are advisory -- HWL */
1324 //if(t->priority==AdvisoryPriority) ??
1325 advisory_thread_count--;
1328 if(t->dist.priority==RevalPriority)
1332 if (RtsFlags.ParFlags.ParStats.Full &&
1333 !RtsFlags.ParFlags.ParStats.Suppressed)
1334 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1339 barf("schedule: invalid thread return code %d", (int)ret);
1343 // When we have +RTS -i0 and we're heap profiling, do a census at
1344 // every GC. This lets us get repeatable runs for debugging.
1345 if (performHeapProfile ||
1346 (RtsFlags.ProfFlags.profileInterval==0 &&
1347 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1348 GarbageCollect(GetRoots, rtsTrue);
1350 performHeapProfile = rtsFalse;
1351 ready_to_gc = rtsFalse; // we already GC'd
1356 /* everybody back, start the GC.
1357 * Could do it in this thread, or signal a condition var
1358 * to do it in another thread. Either way, we need to
1359 * broadcast on gc_pending_cond afterward.
1361 #if defined(RTS_SUPPORTS_THREADS)
1362 IF_DEBUG(scheduler,sched_belch("doing GC"));
1364 GarbageCollect(GetRoots,rtsFalse);
1365 ready_to_gc = rtsFalse;
1367 /* add a ContinueThread event to continue execution of current thread */
1368 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1370 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1372 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1380 IF_GRAN_DEBUG(unused,
1381 print_eventq(EventHd));
1383 event = get_next_event();
1386 /* ToDo: wait for next message to arrive rather than busy wait */
1389 } /* end of while(1) */
1391 IF_PAR_DEBUG(verbose,
1392 belch("== Leaving schedule() after having received Finish"));
1395 /* ---------------------------------------------------------------------------
1396 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1397 * used by Control.Concurrent for error checking.
1398 * ------------------------------------------------------------------------- */
1401 rtsSupportsBoundThreads(void)
1410 /* ---------------------------------------------------------------------------
1411 * isThreadBound(tso): check whether tso is bound to an OS thread.
1412 * ------------------------------------------------------------------------- */
1415 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1419 for(m = main_threads; m; m = m->link)
1428 /* ---------------------------------------------------------------------------
1429 * Singleton fork(). Do not copy any running threads.
1430 * ------------------------------------------------------------------------- */
1433 deleteThreadImmediately(StgTSO *tso);
1436 forkProcess(HsStablePtr *entry)
1438 #ifndef mingw32_TARGET_OS
1444 IF_DEBUG(scheduler,sched_belch("forking!"));
1445 rts_lock(); // This not only acquires sched_mutex, it also
1446 // makes sure that no other threads are running
1450 if (pid) { /* parent */
1452 /* just return the pid */
1456 } else { /* child */
1459 // delete all threads
1460 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1462 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1465 // don't allow threads to catch the ThreadKilled exception
1466 deleteThreadImmediately(t);
1469 // wipe the main thread list
1470 while((m = main_threads) != NULL) {
1471 main_threads = m->link;
1473 closeCondition(&m->bound_thread_cond);
1478 #ifdef RTS_SUPPORTS_THREADS
1479 resetTaskManagerAfterFork(); // tell startTask() and friends that
1480 startingWorkerThread = rtsFalse; // we have no worker threads any more
1481 resetWorkerWakeupPipeAfterFork();
1484 rc = rts_evalStableIO(entry, NULL); // run the action
1485 rts_checkSchedStatus("forkProcess",rc);
1489 hs_exit(); // clean up and exit
1493 barf("forkProcess#: primop not implemented for mingw32, sorry!\n");
1495 #endif /* mingw32 */
1498 /* ---------------------------------------------------------------------------
1499 * deleteAllThreads(): kill all the live threads.
1501 * This is used when we catch a user interrupt (^C), before performing
1502 * any necessary cleanups and running finalizers.
1504 * Locks: sched_mutex held.
1505 * ------------------------------------------------------------------------- */
1508 deleteAllThreads ( void )
1511 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1512 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1513 next = t->global_link;
1516 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1517 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1518 sleeping_queue = END_TSO_QUEUE;
1521 /* startThread and insertThread are now in GranSim.c -- HWL */
1524 /* ---------------------------------------------------------------------------
1525 * Suspending & resuming Haskell threads.
1527 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1528 * its capability before calling the C function. This allows another
1529 * task to pick up the capability and carry on running Haskell
1530 * threads. It also means that if the C call blocks, it won't lock
1533 * The Haskell thread making the C call is put to sleep for the
1534 * duration of the call, on the susepended_ccalling_threads queue. We
1535 * give out a token to the task, which it can use to resume the thread
1536 * on return from the C function.
1537 * ------------------------------------------------------------------------- */
1540 suspendThread( StgRegTable *reg,
1549 int saved_errno = errno;
1551 /* assume that *reg is a pointer to the StgRegTable part
1554 cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1556 ACQUIRE_LOCK(&sched_mutex);
1559 sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1561 // XXX this might not be necessary --SDM
1562 cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1564 threadPaused(cap->r.rCurrentTSO);
1565 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1566 suspended_ccalling_threads = cap->r.rCurrentTSO;
1568 #if defined(RTS_SUPPORTS_THREADS)
1569 if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1571 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1572 cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1576 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1580 /* Use the thread ID as the token; it should be unique */
1581 tok = cap->r.rCurrentTSO->id;
1583 /* Hand back capability */
1584 releaseCapability(cap);
1586 #if defined(RTS_SUPPORTS_THREADS)
1587 /* Preparing to leave the RTS, so ensure there's a native thread/task
1588 waiting to take over.
1590 IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1593 /* Other threads _might_ be available for execution; signal this */
1595 RELEASE_LOCK(&sched_mutex);
1597 errno = saved_errno;
1602 resumeThread( StgInt tok,
1603 rtsBool concCall STG_UNUSED )
1605 StgTSO *tso, **prev;
1607 int saved_errno = errno;
1609 #if defined(RTS_SUPPORTS_THREADS)
1610 /* Wait for permission to re-enter the RTS with the result. */
1611 ACQUIRE_LOCK(&sched_mutex);
1612 waitForReturnCapability(&sched_mutex, &cap);
1614 IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1616 grabCapability(&cap);
1619 /* Remove the thread off of the suspended list */
1620 prev = &suspended_ccalling_threads;
1621 for (tso = suspended_ccalling_threads;
1622 tso != END_TSO_QUEUE;
1623 prev = &tso->link, tso = tso->link) {
1624 if (tso->id == (StgThreadID)tok) {
1629 if (tso == END_TSO_QUEUE) {
1630 barf("resumeThread: thread not found");
1632 tso->link = END_TSO_QUEUE;
1634 #if defined(RTS_SUPPORTS_THREADS)
1635 if(tso->why_blocked == BlockedOnCCall)
1637 awakenBlockedQueueNoLock(tso->blocked_exceptions);
1638 tso->blocked_exceptions = NULL;
1642 /* Reset blocking status */
1643 tso->why_blocked = NotBlocked;
1645 cap->r.rCurrentTSO = tso;
1646 RELEASE_LOCK(&sched_mutex);
1647 errno = saved_errno;
1652 /* ---------------------------------------------------------------------------
1654 * ------------------------------------------------------------------------ */
1655 static void unblockThread(StgTSO *tso);
1657 /* ---------------------------------------------------------------------------
1658 * Comparing Thread ids.
1660 * This is used from STG land in the implementation of the
1661 * instances of Eq/Ord for ThreadIds.
1662 * ------------------------------------------------------------------------ */
1665 cmp_thread(StgPtr tso1, StgPtr tso2)
1667 StgThreadID id1 = ((StgTSO *)tso1)->id;
1668 StgThreadID id2 = ((StgTSO *)tso2)->id;
1670 if (id1 < id2) return (-1);
1671 if (id1 > id2) return 1;
1675 /* ---------------------------------------------------------------------------
1676 * Fetching the ThreadID from an StgTSO.
1678 * This is used in the implementation of Show for ThreadIds.
1679 * ------------------------------------------------------------------------ */
1681 rts_getThreadId(StgPtr tso)
1683 return ((StgTSO *)tso)->id;
1688 labelThread(StgPtr tso, char *label)
1693 /* Caveat: Once set, you can only set the thread name to "" */
1694 len = strlen(label)+1;
1695 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1696 strncpy(buf,label,len);
1697 /* Update will free the old memory for us */
1698 updateThreadLabel(((StgTSO *)tso)->id,buf);
1702 /* ---------------------------------------------------------------------------
1703 Create a new thread.
1705 The new thread starts with the given stack size. Before the
1706 scheduler can run, however, this thread needs to have a closure
1707 (and possibly some arguments) pushed on its stack. See
1708 pushClosure() in Schedule.h.
1710 createGenThread() and createIOThread() (in SchedAPI.h) are
1711 convenient packaged versions of this function.
1713 currently pri (priority) is only used in a GRAN setup -- HWL
1714 ------------------------------------------------------------------------ */
1716 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1718 createThread(nat size, StgInt pri)
1721 createThread(nat size)
1728 /* First check whether we should create a thread at all */
1730 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1731 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1733 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1734 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1735 return END_TSO_QUEUE;
1741 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1744 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1746 /* catch ridiculously small stack sizes */
1747 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1748 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1751 stack_size = size - TSO_STRUCT_SIZEW;
1753 tso = (StgTSO *)allocate(size);
1754 TICK_ALLOC_TSO(stack_size, 0);
1756 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1758 SET_GRAN_HDR(tso, ThisPE);
1761 // Always start with the compiled code evaluator
1762 tso->what_next = ThreadRunGHC;
1764 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1765 * protect the increment operation on next_thread_id.
1766 * In future, we could use an atomic increment instead.
1768 ACQUIRE_LOCK(&thread_id_mutex);
1769 tso->id = next_thread_id++;
1770 RELEASE_LOCK(&thread_id_mutex);
1772 tso->why_blocked = NotBlocked;
1773 tso->blocked_exceptions = NULL;
1775 tso->saved_errno = 0;
1777 tso->stack_size = stack_size;
1778 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1780 tso->sp = (P_)&(tso->stack) + stack_size;
1783 tso->prof.CCCS = CCS_MAIN;
1786 /* put a stop frame on the stack */
1787 tso->sp -= sizeofW(StgStopFrame);
1788 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1791 tso->link = END_TSO_QUEUE;
1792 /* uses more flexible routine in GranSim */
1793 insertThread(tso, CurrentProc);
1795 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1801 if (RtsFlags.GranFlags.GranSimStats.Full)
1802 DumpGranEvent(GR_START,tso);
1804 if (RtsFlags.ParFlags.ParStats.Full)
1805 DumpGranEvent(GR_STARTQ,tso);
1806 /* HACk to avoid SCHEDULE
1810 /* Link the new thread on the global thread list.
1812 tso->global_link = all_threads;
1816 tso->dist.priority = MandatoryPriority; //by default that is...
1820 tso->gran.pri = pri;
1822 tso->gran.magic = TSO_MAGIC; // debugging only
1824 tso->gran.sparkname = 0;
1825 tso->gran.startedat = CURRENT_TIME;
1826 tso->gran.exported = 0;
1827 tso->gran.basicblocks = 0;
1828 tso->gran.allocs = 0;
1829 tso->gran.exectime = 0;
1830 tso->gran.fetchtime = 0;
1831 tso->gran.fetchcount = 0;
1832 tso->gran.blocktime = 0;
1833 tso->gran.blockcount = 0;
1834 tso->gran.blockedat = 0;
1835 tso->gran.globalsparks = 0;
1836 tso->gran.localsparks = 0;
1837 if (RtsFlags.GranFlags.Light)
1838 tso->gran.clock = Now; /* local clock */
1840 tso->gran.clock = 0;
1842 IF_DEBUG(gran,printTSO(tso));
1845 tso->par.magic = TSO_MAGIC; // debugging only
1847 tso->par.sparkname = 0;
1848 tso->par.startedat = CURRENT_TIME;
1849 tso->par.exported = 0;
1850 tso->par.basicblocks = 0;
1851 tso->par.allocs = 0;
1852 tso->par.exectime = 0;
1853 tso->par.fetchtime = 0;
1854 tso->par.fetchcount = 0;
1855 tso->par.blocktime = 0;
1856 tso->par.blockcount = 0;
1857 tso->par.blockedat = 0;
1858 tso->par.globalsparks = 0;
1859 tso->par.localsparks = 0;
1863 globalGranStats.tot_threads_created++;
1864 globalGranStats.threads_created_on_PE[CurrentProc]++;
1865 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1866 globalGranStats.tot_sq_probes++;
1868 // collect parallel global statistics (currently done together with GC stats)
1869 if (RtsFlags.ParFlags.ParStats.Global &&
1870 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1871 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
1872 globalParStats.tot_threads_created++;
1878 belch("==__ schedule: Created TSO %d (%p);",
1879 CurrentProc, tso, tso->id));
1881 IF_PAR_DEBUG(verbose,
1882 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1883 tso->id, tso, advisory_thread_count));
1885 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1886 tso->id, tso->stack_size));
1893 all parallel thread creation calls should fall through the following routine.
1896 createSparkThread(rtsSpark spark)
1898 ASSERT(spark != (rtsSpark)NULL);
1899 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1901 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1902 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1903 return END_TSO_QUEUE;
1907 tso = createThread(RtsFlags.GcFlags.initialStkSize);
1908 if (tso==END_TSO_QUEUE)
1909 barf("createSparkThread: Cannot create TSO");
1911 tso->priority = AdvisoryPriority;
1913 pushClosure(tso,spark);
1914 PUSH_ON_RUN_QUEUE(tso);
1915 advisory_thread_count++;
1922 Turn a spark into a thread.
1923 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1927 activateSpark (rtsSpark spark)
1931 tso = createSparkThread(spark);
1932 if (RtsFlags.ParFlags.ParStats.Full) {
1933 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1934 IF_PAR_DEBUG(verbose,
1935 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1936 (StgClosure *)spark, info_type((StgClosure *)spark)));
1938 // ToDo: fwd info on local/global spark to thread -- HWL
1939 // tso->gran.exported = spark->exported;
1940 // tso->gran.locked = !spark->global;
1941 // tso->gran.sparkname = spark->name;
1947 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1948 Capability *initialCapability
1952 /* ---------------------------------------------------------------------------
1955 * scheduleThread puts a thread on the head of the runnable queue.
1956 * This will usually be done immediately after a thread is created.
1957 * The caller of scheduleThread must create the thread using e.g.
1958 * createThread and push an appropriate closure
1959 * on this thread's stack before the scheduler is invoked.
1960 * ------------------------------------------------------------------------ */
1962 static void scheduleThread_ (StgTSO* tso);
1965 scheduleThread_(StgTSO *tso)
1967 // Precondition: sched_mutex must be held.
1969 /* Put the new thread on the head of the runnable queue. The caller
1970 * better push an appropriate closure on this thread's stack
1971 * beforehand. In the SMP case, the thread may start running as
1972 * soon as we release the scheduler lock below.
1974 PUSH_ON_RUN_QUEUE(tso);
1978 IF_DEBUG(scheduler,printTSO(tso));
1982 void scheduleThread(StgTSO* tso)
1984 ACQUIRE_LOCK(&sched_mutex);
1985 scheduleThread_(tso);
1986 RELEASE_LOCK(&sched_mutex);
1989 #if defined(RTS_SUPPORTS_THREADS)
1990 static Condition *bound_cond_cache = NULL;
1994 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1995 Capability *initialCapability)
1997 // Precondition: sched_mutex must be held
2000 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2004 #if defined(RTS_SUPPORTS_THREADS)
2005 // Allocating a new condition for each thread is expensive, so we
2006 // cache one. This is a pretty feeble hack, but it helps speed up
2007 // consecutive call-ins quite a bit.
2008 if (bound_cond_cache != NULL) {
2009 m->bound_thread_cond = *bound_cond_cache;
2010 bound_cond_cache = NULL;
2012 initCondition(&m->bound_thread_cond);
2016 /* Put the thread on the main-threads list prior to scheduling the TSO.
2017 Failure to do so introduces a race condition in the MT case (as
2018 identified by Wolfgang Thaller), whereby the new task/OS thread
2019 created by scheduleThread_() would complete prior to the thread
2020 that spawned it managed to put 'itself' on the main-threads list.
2021 The upshot of it all being that the worker thread wouldn't get to
2022 signal the completion of the its work item for the main thread to
2023 see (==> it got stuck waiting.) -- sof 6/02.
2025 IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2027 m->link = main_threads;
2030 scheduleThread_(tso);
2032 return waitThread_(m, initialCapability);
2035 /* ---------------------------------------------------------------------------
2038 * Initialise the scheduler. This resets all the queues - if the
2039 * queues contained any threads, they'll be garbage collected at the
2042 * ------------------------------------------------------------------------ */
2050 for (i=0; i<=MAX_PROC; i++) {
2051 run_queue_hds[i] = END_TSO_QUEUE;
2052 run_queue_tls[i] = END_TSO_QUEUE;
2053 blocked_queue_hds[i] = END_TSO_QUEUE;
2054 blocked_queue_tls[i] = END_TSO_QUEUE;
2055 ccalling_threadss[i] = END_TSO_QUEUE;
2056 sleeping_queue = END_TSO_QUEUE;
2059 run_queue_hd = END_TSO_QUEUE;
2060 run_queue_tl = END_TSO_QUEUE;
2061 blocked_queue_hd = END_TSO_QUEUE;
2062 blocked_queue_tl = END_TSO_QUEUE;
2063 sleeping_queue = END_TSO_QUEUE;
2066 suspended_ccalling_threads = END_TSO_QUEUE;
2068 main_threads = NULL;
2069 all_threads = END_TSO_QUEUE;
2074 RtsFlags.ConcFlags.ctxtSwitchTicks =
2075 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2077 #if defined(RTS_SUPPORTS_THREADS)
2078 /* Initialise the mutex and condition variables used by
2080 initMutex(&sched_mutex);
2081 initMutex(&term_mutex);
2082 initMutex(&thread_id_mutex);
2084 initCondition(&thread_ready_cond);
2087 #if defined(RTS_SUPPORTS_THREADS)
2088 ACQUIRE_LOCK(&sched_mutex);
2091 /* A capability holds the state a native thread needs in
2092 * order to execute STG code. At least one capability is
2093 * floating around (only SMP builds have more than one).
2097 #if defined(RTS_SUPPORTS_THREADS)
2098 /* start our haskell execution tasks */
2099 startTaskManager(0,taskStart);
2102 #if /* defined(SMP) ||*/ defined(PAR)
2106 RELEASE_LOCK(&sched_mutex);
2111 exitScheduler( void )
2113 #if defined(RTS_SUPPORTS_THREADS)
2116 shutting_down_scheduler = rtsTrue;
2119 /* ----------------------------------------------------------------------------
2120 Managing the per-task allocation areas.
2122 Each capability comes with an allocation area. These are
2123 fixed-length block lists into which allocation can be done.
2125 ToDo: no support for two-space collection at the moment???
2126 ------------------------------------------------------------------------- */
2130 waitThread_(StgMainThread* m, Capability *initialCapability)
2132 SchedulerStatus stat;
2134 // Precondition: sched_mutex must be held.
2135 IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2138 /* GranSim specific init */
2139 CurrentTSO = m->tso; // the TSO to run
2140 procStatus[MainProc] = Busy; // status of main PE
2141 CurrentProc = MainProc; // PE to run it on
2142 schedule(m,initialCapability);
2144 schedule(m,initialCapability);
2145 ASSERT(m->stat != NoStatus);
2150 #if defined(RTS_SUPPORTS_THREADS)
2151 // Free the condition variable, returning it to the cache if possible.
2152 if (bound_cond_cache == NULL) {
2153 *bound_cond_cache = m->bound_thread_cond;
2155 closeCondition(&m->bound_thread_cond);
2159 IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2162 // Postcondition: sched_mutex still held
2166 /* ---------------------------------------------------------------------------
2167 Where are the roots that we know about?
2169 - all the threads on the runnable queue
2170 - all the threads on the blocked queue
2171 - all the threads on the sleeping queue
2172 - all the thread currently executing a _ccall_GC
2173 - all the "main threads"
2175 ------------------------------------------------------------------------ */
2177 /* This has to be protected either by the scheduler monitor, or by the
2178 garbage collection monitor (probably the latter).
2183 GetRoots( evac_fn evac )
2188 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2189 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2190 evac((StgClosure **)&run_queue_hds[i]);
2191 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2192 evac((StgClosure **)&run_queue_tls[i]);
2194 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2195 evac((StgClosure **)&blocked_queue_hds[i]);
2196 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2197 evac((StgClosure **)&blocked_queue_tls[i]);
2198 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2199 evac((StgClosure **)&ccalling_threads[i]);
2206 if (run_queue_hd != END_TSO_QUEUE) {
2207 ASSERT(run_queue_tl != END_TSO_QUEUE);
2208 evac((StgClosure **)&run_queue_hd);
2209 evac((StgClosure **)&run_queue_tl);
2212 if (blocked_queue_hd != END_TSO_QUEUE) {
2213 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2214 evac((StgClosure **)&blocked_queue_hd);
2215 evac((StgClosure **)&blocked_queue_tl);
2218 if (sleeping_queue != END_TSO_QUEUE) {
2219 evac((StgClosure **)&sleeping_queue);
2223 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2224 evac((StgClosure **)&suspended_ccalling_threads);
2227 #if defined(PAR) || defined(GRAN)
2228 markSparkQueue(evac);
2231 #if defined(RTS_USER_SIGNALS)
2232 // mark the signal handlers (signals should be already blocked)
2233 markSignalHandlers(evac);
2236 // main threads which have completed need to be retained until they
2237 // are dealt with in the main scheduler loop. They won't be
2238 // retained any other way: the GC will drop them from the
2239 // all_threads list, so we have to be careful to treat them as roots
2243 for (m = main_threads; m != NULL; m = m->link) {
2244 switch (m->tso->what_next) {
2245 case ThreadComplete:
2247 evac((StgClosure **)&m->tso);
2256 /* -----------------------------------------------------------------------------
2259 This is the interface to the garbage collector from Haskell land.
2260 We provide this so that external C code can allocate and garbage
2261 collect when called from Haskell via _ccall_GC.
2263 It might be useful to provide an interface whereby the programmer
2264 can specify more roots (ToDo).
2266 This needs to be protected by the GC condition variable above. KH.
2267 -------------------------------------------------------------------------- */
2269 static void (*extra_roots)(evac_fn);
2274 /* Obligated to hold this lock upon entry */
2275 ACQUIRE_LOCK(&sched_mutex);
2276 GarbageCollect(GetRoots,rtsFalse);
2277 RELEASE_LOCK(&sched_mutex);
2281 performMajorGC(void)
2283 ACQUIRE_LOCK(&sched_mutex);
2284 GarbageCollect(GetRoots,rtsTrue);
2285 RELEASE_LOCK(&sched_mutex);
2289 AllRoots(evac_fn evac)
2291 GetRoots(evac); // the scheduler's roots
2292 extra_roots(evac); // the user's roots
2296 performGCWithRoots(void (*get_roots)(evac_fn))
2298 ACQUIRE_LOCK(&sched_mutex);
2299 extra_roots = get_roots;
2300 GarbageCollect(AllRoots,rtsFalse);
2301 RELEASE_LOCK(&sched_mutex);
2304 /* -----------------------------------------------------------------------------
2307 If the thread has reached its maximum stack size, then raise the
2308 StackOverflow exception in the offending thread. Otherwise
2309 relocate the TSO into a larger chunk of memory and adjust its stack
2311 -------------------------------------------------------------------------- */
2314 threadStackOverflow(StgTSO *tso)
2316 nat new_stack_size, new_tso_size, stack_words;
2320 IF_DEBUG(sanity,checkTSO(tso));
2321 if (tso->stack_size >= tso->max_stack_size) {
2324 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2325 tso->id, tso, tso->stack_size, tso->max_stack_size);
2326 /* If we're debugging, just print out the top of the stack */
2327 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2330 /* Send this thread the StackOverflow exception */
2331 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2335 /* Try to double the current stack size. If that takes us over the
2336 * maximum stack size for this thread, then use the maximum instead.
2337 * Finally round up so the TSO ends up as a whole number of blocks.
2339 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2340 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2341 TSO_STRUCT_SIZE)/sizeof(W_);
2342 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2343 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2345 IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2347 dest = (StgTSO *)allocate(new_tso_size);
2348 TICK_ALLOC_TSO(new_stack_size,0);
2350 /* copy the TSO block and the old stack into the new area */
2351 memcpy(dest,tso,TSO_STRUCT_SIZE);
2352 stack_words = tso->stack + tso->stack_size - tso->sp;
2353 new_sp = (P_)dest + new_tso_size - stack_words;
2354 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2356 /* relocate the stack pointers... */
2358 dest->stack_size = new_stack_size;
2360 /* Mark the old TSO as relocated. We have to check for relocated
2361 * TSOs in the garbage collector and any primops that deal with TSOs.
2363 * It's important to set the sp value to just beyond the end
2364 * of the stack, so we don't attempt to scavenge any part of the
2367 tso->what_next = ThreadRelocated;
2369 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2370 tso->why_blocked = NotBlocked;
2371 dest->mut_link = NULL;
2373 IF_PAR_DEBUG(verbose,
2374 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2375 tso->id, tso, tso->stack_size);
2376 /* If we're debugging, just print out the top of the stack */
2377 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2380 IF_DEBUG(sanity,checkTSO(tso));
2382 IF_DEBUG(scheduler,printTSO(dest));
2388 /* ---------------------------------------------------------------------------
2389 Wake up a queue that was blocked on some resource.
2390 ------------------------------------------------------------------------ */
2394 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2399 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2401 /* write RESUME events to log file and
2402 update blocked and fetch time (depending on type of the orig closure) */
2403 if (RtsFlags.ParFlags.ParStats.Full) {
2404 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2405 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2406 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2407 if (EMPTY_RUN_QUEUE())
2408 emitSchedule = rtsTrue;
2410 switch (get_itbl(node)->type) {
2412 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2417 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2424 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2431 static StgBlockingQueueElement *
2432 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2435 PEs node_loc, tso_loc;
2437 node_loc = where_is(node); // should be lifted out of loop
2438 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2439 tso_loc = where_is((StgClosure *)tso);
2440 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2441 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2442 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2443 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2444 // insertThread(tso, node_loc);
2445 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2447 tso, node, (rtsSpark*)NULL);
2448 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2451 } else { // TSO is remote (actually should be FMBQ)
2452 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2453 RtsFlags.GranFlags.Costs.gunblocktime +
2454 RtsFlags.GranFlags.Costs.latency;
2455 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2457 tso, node, (rtsSpark*)NULL);
2458 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2461 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2463 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2464 (node_loc==tso_loc ? "Local" : "Global"),
2465 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2466 tso->block_info.closure = NULL;
2467 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2471 static StgBlockingQueueElement *
2472 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2474 StgBlockingQueueElement *next;
2476 switch (get_itbl(bqe)->type) {
2478 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2479 /* if it's a TSO just push it onto the run_queue */
2481 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2482 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2484 unblockCount(bqe, node);
2485 /* reset blocking status after dumping event */
2486 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2490 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2492 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2493 PendingFetches = (StgBlockedFetch *)bqe;
2497 /* can ignore this case in a non-debugging setup;
2498 see comments on RBHSave closures above */
2500 /* check that the closure is an RBHSave closure */
2501 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2502 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2503 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2507 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2508 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2512 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2516 #else /* !GRAN && !PAR */
2518 unblockOneLocked(StgTSO *tso)
2522 ASSERT(get_itbl(tso)->type == TSO);
2523 ASSERT(tso->why_blocked != NotBlocked);
2524 tso->why_blocked = NotBlocked;
2526 PUSH_ON_RUN_QUEUE(tso);
2528 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2533 #if defined(GRAN) || defined(PAR)
2534 INLINE_ME StgBlockingQueueElement *
2535 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2537 ACQUIRE_LOCK(&sched_mutex);
2538 bqe = unblockOneLocked(bqe, node);
2539 RELEASE_LOCK(&sched_mutex);
2544 unblockOne(StgTSO *tso)
2546 ACQUIRE_LOCK(&sched_mutex);
2547 tso = unblockOneLocked(tso);
2548 RELEASE_LOCK(&sched_mutex);
2555 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2557 StgBlockingQueueElement *bqe;
2562 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2563 node, CurrentProc, CurrentTime[CurrentProc],
2564 CurrentTSO->id, CurrentTSO));
2566 node_loc = where_is(node);
2568 ASSERT(q == END_BQ_QUEUE ||
2569 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2570 get_itbl(q)->type == CONSTR); // closure (type constructor)
2571 ASSERT(is_unique(node));
2573 /* FAKE FETCH: magically copy the node to the tso's proc;
2574 no Fetch necessary because in reality the node should not have been
2575 moved to the other PE in the first place
2577 if (CurrentProc!=node_loc) {
2579 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2580 node, node_loc, CurrentProc, CurrentTSO->id,
2581 // CurrentTSO, where_is(CurrentTSO),
2582 node->header.gran.procs));
2583 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2585 belch("## new bitmask of node %p is %#x",
2586 node, node->header.gran.procs));
2587 if (RtsFlags.GranFlags.GranSimStats.Global) {
2588 globalGranStats.tot_fake_fetches++;
2593 // ToDo: check: ASSERT(CurrentProc==node_loc);
2594 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2597 bqe points to the current element in the queue
2598 next points to the next element in the queue
2600 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2601 //tso_loc = where_is(tso);
2603 bqe = unblockOneLocked(bqe, node);
2606 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2607 the closure to make room for the anchor of the BQ */
2608 if (bqe!=END_BQ_QUEUE) {
2609 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2611 ASSERT((info_ptr==&RBH_Save_0_info) ||
2612 (info_ptr==&RBH_Save_1_info) ||
2613 (info_ptr==&RBH_Save_2_info));
2615 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2616 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2617 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2620 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2621 node, info_type(node)));
2624 /* statistics gathering */
2625 if (RtsFlags.GranFlags.GranSimStats.Global) {
2626 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2627 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2628 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2629 globalGranStats.tot_awbq++; // total no. of bqs awakened
2632 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2633 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2637 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2639 StgBlockingQueueElement *bqe;
2641 ACQUIRE_LOCK(&sched_mutex);
2643 IF_PAR_DEBUG(verbose,
2644 belch("##-_ AwBQ for node %p on [%x]: ",
2648 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2649 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2654 ASSERT(q == END_BQ_QUEUE ||
2655 get_itbl(q)->type == TSO ||
2656 get_itbl(q)->type == BLOCKED_FETCH ||
2657 get_itbl(q)->type == CONSTR);
2660 while (get_itbl(bqe)->type==TSO ||
2661 get_itbl(bqe)->type==BLOCKED_FETCH) {
2662 bqe = unblockOneLocked(bqe, node);
2664 RELEASE_LOCK(&sched_mutex);
2667 #else /* !GRAN && !PAR */
2669 #ifdef RTS_SUPPORTS_THREADS
2671 awakenBlockedQueueNoLock(StgTSO *tso)
2673 while (tso != END_TSO_QUEUE) {
2674 tso = unblockOneLocked(tso);
2680 awakenBlockedQueue(StgTSO *tso)
2682 ACQUIRE_LOCK(&sched_mutex);
2683 while (tso != END_TSO_QUEUE) {
2684 tso = unblockOneLocked(tso);
2686 RELEASE_LOCK(&sched_mutex);
2690 /* ---------------------------------------------------------------------------
2692 - usually called inside a signal handler so it mustn't do anything fancy.
2693 ------------------------------------------------------------------------ */
2696 interruptStgRts(void)
2700 #ifdef RTS_SUPPORTS_THREADS
2701 wakeBlockedWorkerThread();
2705 /* -----------------------------------------------------------------------------
2708 This is for use when we raise an exception in another thread, which
2710 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2711 -------------------------------------------------------------------------- */
2713 #if defined(GRAN) || defined(PAR)
2715 NB: only the type of the blocking queue is different in GranSim and GUM
2716 the operations on the queue-elements are the same
2717 long live polymorphism!
2719 Locks: sched_mutex is held upon entry and exit.
2723 unblockThread(StgTSO *tso)
2725 StgBlockingQueueElement *t, **last;
2727 switch (tso->why_blocked) {
2730 return; /* not blocked */
2733 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2735 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2736 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2738 last = (StgBlockingQueueElement **)&mvar->head;
2739 for (t = (StgBlockingQueueElement *)mvar->head;
2741 last = &t->link, last_tso = t, t = t->link) {
2742 if (t == (StgBlockingQueueElement *)tso) {
2743 *last = (StgBlockingQueueElement *)tso->link;
2744 if (mvar->tail == tso) {
2745 mvar->tail = (StgTSO *)last_tso;
2750 barf("unblockThread (MVAR): TSO not found");
2753 case BlockedOnBlackHole:
2754 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2756 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2758 last = &bq->blocking_queue;
2759 for (t = bq->blocking_queue;
2761 last = &t->link, t = t->link) {
2762 if (t == (StgBlockingQueueElement *)tso) {
2763 *last = (StgBlockingQueueElement *)tso->link;
2767 barf("unblockThread (BLACKHOLE): TSO not found");
2770 case BlockedOnException:
2772 StgTSO *target = tso->block_info.tso;
2774 ASSERT(get_itbl(target)->type == TSO);
2776 if (target->what_next == ThreadRelocated) {
2777 target = target->link;
2778 ASSERT(get_itbl(target)->type == TSO);
2781 ASSERT(target->blocked_exceptions != NULL);
2783 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2784 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2786 last = &t->link, t = t->link) {
2787 ASSERT(get_itbl(t)->type == TSO);
2788 if (t == (StgBlockingQueueElement *)tso) {
2789 *last = (StgBlockingQueueElement *)tso->link;
2793 barf("unblockThread (Exception): TSO not found");
2797 case BlockedOnWrite:
2798 #if defined(mingw32_TARGET_OS)
2799 case BlockedOnDoProc:
2802 /* take TSO off blocked_queue */
2803 StgBlockingQueueElement *prev = NULL;
2804 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2805 prev = t, t = t->link) {
2806 if (t == (StgBlockingQueueElement *)tso) {
2808 blocked_queue_hd = (StgTSO *)t->link;
2809 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2810 blocked_queue_tl = END_TSO_QUEUE;
2813 prev->link = t->link;
2814 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2815 blocked_queue_tl = (StgTSO *)prev;
2821 barf("unblockThread (I/O): TSO not found");
2824 case BlockedOnDelay:
2826 /* take TSO off sleeping_queue */
2827 StgBlockingQueueElement *prev = NULL;
2828 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2829 prev = t, t = t->link) {
2830 if (t == (StgBlockingQueueElement *)tso) {
2832 sleeping_queue = (StgTSO *)t->link;
2834 prev->link = t->link;
2839 barf("unblockThread (delay): TSO not found");
2843 barf("unblockThread");
2847 tso->link = END_TSO_QUEUE;
2848 tso->why_blocked = NotBlocked;
2849 tso->block_info.closure = NULL;
2850 PUSH_ON_RUN_QUEUE(tso);
2854 unblockThread(StgTSO *tso)
2858 /* To avoid locking unnecessarily. */
2859 if (tso->why_blocked == NotBlocked) {
2863 switch (tso->why_blocked) {
2866 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2868 StgTSO *last_tso = END_TSO_QUEUE;
2869 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2872 for (t = mvar->head; t != END_TSO_QUEUE;
2873 last = &t->link, last_tso = t, t = t->link) {
2876 if (mvar->tail == tso) {
2877 mvar->tail = last_tso;
2882 barf("unblockThread (MVAR): TSO not found");
2885 case BlockedOnBlackHole:
2886 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2888 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2890 last = &bq->blocking_queue;
2891 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2892 last = &t->link, t = t->link) {
2898 barf("unblockThread (BLACKHOLE): TSO not found");
2901 case BlockedOnException:
2903 StgTSO *target = tso->block_info.tso;
2905 ASSERT(get_itbl(target)->type == TSO);
2907 while (target->what_next == ThreadRelocated) {
2908 target = target->link;
2909 ASSERT(get_itbl(target)->type == TSO);
2912 ASSERT(target->blocked_exceptions != NULL);
2914 last = &target->blocked_exceptions;
2915 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2916 last = &t->link, t = t->link) {
2917 ASSERT(get_itbl(t)->type == TSO);
2923 barf("unblockThread (Exception): TSO not found");
2927 case BlockedOnWrite:
2928 #if defined(mingw32_TARGET_OS)
2929 case BlockedOnDoProc:
2932 StgTSO *prev = NULL;
2933 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2934 prev = t, t = t->link) {
2937 blocked_queue_hd = t->link;
2938 if (blocked_queue_tl == t) {
2939 blocked_queue_tl = END_TSO_QUEUE;
2942 prev->link = t->link;
2943 if (blocked_queue_tl == t) {
2944 blocked_queue_tl = prev;
2950 barf("unblockThread (I/O): TSO not found");
2953 case BlockedOnDelay:
2955 StgTSO *prev = NULL;
2956 for (t = sleeping_queue; t != END_TSO_QUEUE;
2957 prev = t, t = t->link) {
2960 sleeping_queue = t->link;
2962 prev->link = t->link;
2967 barf("unblockThread (delay): TSO not found");
2971 barf("unblockThread");
2975 tso->link = END_TSO_QUEUE;
2976 tso->why_blocked = NotBlocked;
2977 tso->block_info.closure = NULL;
2978 PUSH_ON_RUN_QUEUE(tso);
2982 /* -----------------------------------------------------------------------------
2985 * The following function implements the magic for raising an
2986 * asynchronous exception in an existing thread.
2988 * We first remove the thread from any queue on which it might be
2989 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2991 * We strip the stack down to the innermost CATCH_FRAME, building
2992 * thunks in the heap for all the active computations, so they can
2993 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2994 * an application of the handler to the exception, and push it on
2995 * the top of the stack.
2997 * How exactly do we save all the active computations? We create an
2998 * AP_STACK for every UpdateFrame on the stack. Entering one of these
2999 * AP_STACKs pushes everything from the corresponding update frame
3000 * upwards onto the stack. (Actually, it pushes everything up to the
3001 * next update frame plus a pointer to the next AP_STACK object.
3002 * Entering the next AP_STACK object pushes more onto the stack until we
3003 * reach the last AP_STACK object - at which point the stack should look
3004 * exactly as it did when we killed the TSO and we can continue
3005 * execution by entering the closure on top of the stack.
3007 * We can also kill a thread entirely - this happens if either (a) the
3008 * exception passed to raiseAsync is NULL, or (b) there's no
3009 * CATCH_FRAME on the stack. In either case, we strip the entire
3010 * stack and replace the thread with a zombie.
3012 * Locks: sched_mutex held upon entry nor exit.
3014 * -------------------------------------------------------------------------- */
3017 deleteThread(StgTSO *tso)
3019 raiseAsync(tso,NULL);
3023 deleteThreadImmediately(StgTSO *tso)
3024 { // for forkProcess only:
3025 // delete thread without giving it a chance to catch the KillThread exception
3027 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3030 #if defined(RTS_SUPPORTS_THREADS)
3031 if (tso->why_blocked != BlockedOnCCall
3032 && tso->why_blocked != BlockedOnCCall_NoUnblockExc)
3035 tso->what_next = ThreadKilled;
3039 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3041 /* When raising async exs from contexts where sched_mutex isn't held;
3042 use raiseAsyncWithLock(). */
3043 ACQUIRE_LOCK(&sched_mutex);
3044 raiseAsync(tso,exception);
3045 RELEASE_LOCK(&sched_mutex);
3049 raiseAsync(StgTSO *tso, StgClosure *exception)
3051 StgRetInfoTable *info;
3054 // Thread already dead?
3055 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3060 sched_belch("raising exception in thread %ld.", tso->id));
3062 // Remove it from any blocking queues
3067 // The stack freezing code assumes there's a closure pointer on
3068 // the top of the stack, so we have to arrange that this is the case...
3070 if (sp[0] == (W_)&stg_enter_info) {
3074 sp[0] = (W_)&stg_dummy_ret_closure;
3080 // 1. Let the top of the stack be the "current closure"
3082 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3085 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3086 // current closure applied to the chunk of stack up to (but not
3087 // including) the update frame. This closure becomes the "current
3088 // closure". Go back to step 2.
3090 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3091 // top of the stack applied to the exception.
3093 // 5. If it's a STOP_FRAME, then kill the thread.
3098 info = get_ret_itbl((StgClosure *)frame);
3100 while (info->i.type != UPDATE_FRAME
3101 && (info->i.type != CATCH_FRAME || exception == NULL)
3102 && info->i.type != STOP_FRAME) {
3103 frame += stack_frame_sizeW((StgClosure *)frame);
3104 info = get_ret_itbl((StgClosure *)frame);
3107 switch (info->i.type) {
3110 // If we find a CATCH_FRAME, and we've got an exception to raise,
3111 // then build the THUNK raise(exception), and leave it on
3112 // top of the CATCH_FRAME ready to enter.
3116 StgCatchFrame *cf = (StgCatchFrame *)frame;
3120 // we've got an exception to raise, so let's pass it to the
3121 // handler in this frame.
3123 raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3124 TICK_ALLOC_SE_THK(1,0);
3125 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3126 raise->payload[0] = exception;
3128 // throw away the stack from Sp up to the CATCH_FRAME.
3132 /* Ensure that async excpetions are blocked now, so we don't get
3133 * a surprise exception before we get around to executing the
3136 if (tso->blocked_exceptions == NULL) {
3137 tso->blocked_exceptions = END_TSO_QUEUE;
3140 /* Put the newly-built THUNK on top of the stack, ready to execute
3141 * when the thread restarts.
3144 sp[-1] = (W_)&stg_enter_info;
3146 tso->what_next = ThreadRunGHC;
3147 IF_DEBUG(sanity, checkTSO(tso));
3156 // First build an AP_STACK consisting of the stack chunk above the
3157 // current update frame, with the top word on the stack as the
3160 words = frame - sp - 1;
3161 ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3164 ap->fun = (StgClosure *)sp[0];
3166 for(i=0; i < (nat)words; ++i) {
3167 ap->payload[i] = (StgClosure *)*sp++;
3170 SET_HDR(ap,&stg_AP_STACK_info,
3171 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3172 TICK_ALLOC_UP_THK(words+1,0);
3175 fprintf(stderr, "sched: Updating ");
3176 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3177 fprintf(stderr, " with ");
3178 printObj((StgClosure *)ap);
3181 // Replace the updatee with an indirection - happily
3182 // this will also wake up any threads currently
3183 // waiting on the result.
3185 // Warning: if we're in a loop, more than one update frame on
3186 // the stack may point to the same object. Be careful not to
3187 // overwrite an IND_OLDGEN in this case, because we'll screw
3188 // up the mutable lists. To be on the safe side, don't
3189 // overwrite any kind of indirection at all. See also
3190 // threadSqueezeStack in GC.c, where we have to make a similar
3193 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3194 // revert the black hole
3195 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3197 sp += sizeofW(StgUpdateFrame) - 1;
3198 sp[0] = (W_)ap; // push onto stack
3203 // We've stripped the entire stack, the thread is now dead.
3204 sp += sizeofW(StgStopFrame);
3205 tso->what_next = ThreadKilled;
3216 /* -----------------------------------------------------------------------------
3217 resurrectThreads is called after garbage collection on the list of
3218 threads found to be garbage. Each of these threads will be woken
3219 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3220 on an MVar, or NonTermination if the thread was blocked on a Black
3223 Locks: sched_mutex isn't held upon entry nor exit.
3224 -------------------------------------------------------------------------- */
3227 resurrectThreads( StgTSO *threads )
3231 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3232 next = tso->global_link;
3233 tso->global_link = all_threads;
3235 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3237 switch (tso->why_blocked) {
3239 case BlockedOnException:
3240 /* Called by GC - sched_mutex lock is currently held. */
3241 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3243 case BlockedOnBlackHole:
3244 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3247 /* This might happen if the thread was blocked on a black hole
3248 * belonging to a thread that we've just woken up (raiseAsync
3249 * can wake up threads, remember...).
3253 barf("resurrectThreads: thread blocked in a strange way");
3258 /* -----------------------------------------------------------------------------
3259 * Blackhole detection: if we reach a deadlock, test whether any
3260 * threads are blocked on themselves. Any threads which are found to
3261 * be self-blocked get sent a NonTermination exception.
3263 * This is only done in a deadlock situation in order to avoid
3264 * performance overhead in the normal case.
3266 * Locks: sched_mutex is held upon entry and exit.
3267 * -------------------------------------------------------------------------- */
3270 detectBlackHoles( void )
3272 StgTSO *tso = all_threads;
3274 StgClosure *blocked_on;
3275 StgRetInfoTable *info;
3277 for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3279 while (tso->what_next == ThreadRelocated) {
3281 ASSERT(get_itbl(tso)->type == TSO);
3284 if (tso->why_blocked != BlockedOnBlackHole) {
3287 blocked_on = tso->block_info.closure;
3289 frame = (StgClosure *)tso->sp;
3292 info = get_ret_itbl(frame);
3293 switch (info->i.type) {
3295 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3296 /* We are blocking on one of our own computations, so
3297 * send this thread the NonTermination exception.
3300 sched_belch("thread %d is blocked on itself", tso->id));
3301 raiseAsync(tso, (StgClosure *)NonTermination_closure);
3305 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3311 // normal stack frames; do nothing except advance the pointer
3313 (StgPtr)frame += stack_frame_sizeW(frame);
3320 /* ----------------------------------------------------------------------------
3321 * Debugging: why is a thread blocked
3322 * [Also provides useful information when debugging threaded programs
3323 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3324 ------------------------------------------------------------------------- */
3328 printThreadBlockage(StgTSO *tso)
3330 switch (tso->why_blocked) {
3332 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3334 case BlockedOnWrite:
3335 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3337 #if defined(mingw32_TARGET_OS)
3338 case BlockedOnDoProc:
3339 fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3342 case BlockedOnDelay:
3343 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3346 fprintf(stderr,"is blocked on an MVar");
3348 case BlockedOnException:
3349 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3350 tso->block_info.tso->id);
3352 case BlockedOnBlackHole:
3353 fprintf(stderr,"is blocked on a black hole");
3356 fprintf(stderr,"is not blocked");
3360 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3361 tso->block_info.closure, info_type(tso->block_info.closure));
3363 case BlockedOnGA_NoSend:
3364 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3365 tso->block_info.closure, info_type(tso->block_info.closure));
3368 #if defined(RTS_SUPPORTS_THREADS)
3369 case BlockedOnCCall:
3370 fprintf(stderr,"is blocked on an external call");
3372 case BlockedOnCCall_NoUnblockExc:
3373 fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3377 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3378 tso->why_blocked, tso->id, tso);
3384 printThreadStatus(StgTSO *tso)
3386 switch (tso->what_next) {
3388 fprintf(stderr,"has been killed");
3390 case ThreadComplete:
3391 fprintf(stderr,"has completed");
3394 printThreadBlockage(tso);
3399 printAllThreads(void)
3405 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3406 ullong_format_string(TIME_ON_PROC(CurrentProc),
3407 time_string, rtsFalse/*no commas!*/);
3409 fprintf(stderr, "all threads at [%s]:\n", time_string);
3411 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3412 ullong_format_string(CURRENT_TIME,
3413 time_string, rtsFalse/*no commas!*/);
3415 fprintf(stderr,"all threads at [%s]:\n", time_string);
3417 fprintf(stderr,"all threads:\n");
3420 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3421 fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3422 label = lookupThreadLabel(t->id);
3423 if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3424 printThreadStatus(t);
3425 fprintf(stderr,"\n");
3432 Print a whole blocking queue attached to node (debugging only).
3436 print_bq (StgClosure *node)
3438 StgBlockingQueueElement *bqe;
3442 fprintf(stderr,"## BQ of closure %p (%s): ",
3443 node, info_type(node));
3445 /* should cover all closures that may have a blocking queue */
3446 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3447 get_itbl(node)->type == FETCH_ME_BQ ||
3448 get_itbl(node)->type == RBH ||
3449 get_itbl(node)->type == MVAR);
3451 ASSERT(node!=(StgClosure*)NULL); // sanity check
3453 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3457 Print a whole blocking queue starting with the element bqe.
3460 print_bqe (StgBlockingQueueElement *bqe)
3465 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3467 for (end = (bqe==END_BQ_QUEUE);
3468 !end; // iterate until bqe points to a CONSTR
3469 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3470 bqe = end ? END_BQ_QUEUE : bqe->link) {
3471 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3472 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3473 /* types of closures that may appear in a blocking queue */
3474 ASSERT(get_itbl(bqe)->type == TSO ||
3475 get_itbl(bqe)->type == BLOCKED_FETCH ||
3476 get_itbl(bqe)->type == CONSTR);
3477 /* only BQs of an RBH end with an RBH_Save closure */
3478 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3480 switch (get_itbl(bqe)->type) {
3482 fprintf(stderr," TSO %u (%x),",
3483 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3486 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3487 ((StgBlockedFetch *)bqe)->node,
3488 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3489 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3490 ((StgBlockedFetch *)bqe)->ga.weight);
3493 fprintf(stderr," %s (IP %p),",
3494 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3495 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3496 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3497 "RBH_Save_?"), get_itbl(bqe));
3500 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3501 info_type((StgClosure *)bqe)); // , node, info_type(node));
3505 fputc('\n', stderr);
3507 # elif defined(GRAN)
3509 print_bq (StgClosure *node)
3511 StgBlockingQueueElement *bqe;
3512 PEs node_loc, tso_loc;
3515 /* should cover all closures that may have a blocking queue */
3516 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3517 get_itbl(node)->type == FETCH_ME_BQ ||
3518 get_itbl(node)->type == RBH);
3520 ASSERT(node!=(StgClosure*)NULL); // sanity check
3521 node_loc = where_is(node);
3523 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3524 node, info_type(node), node_loc);
3527 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3529 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3530 !end; // iterate until bqe points to a CONSTR
3531 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3532 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3533 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3534 /* types of closures that may appear in a blocking queue */
3535 ASSERT(get_itbl(bqe)->type == TSO ||
3536 get_itbl(bqe)->type == CONSTR);
3537 /* only BQs of an RBH end with an RBH_Save closure */
3538 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3540 tso_loc = where_is((StgClosure *)bqe);
3541 switch (get_itbl(bqe)->type) {
3543 fprintf(stderr," TSO %d (%p) on [PE %d],",
3544 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3547 fprintf(stderr," %s (IP %p),",
3548 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3549 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3550 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3551 "RBH_Save_?"), get_itbl(bqe));
3554 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3555 info_type((StgClosure *)bqe), node, info_type(node));
3559 fputc('\n', stderr);
3563 Nice and easy: only TSOs on the blocking queue
3566 print_bq (StgClosure *node)
3570 ASSERT(node!=(StgClosure*)NULL); // sanity check
3571 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3572 tso != END_TSO_QUEUE;
3574 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3575 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3576 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3578 fputc('\n', stderr);
3589 for (i=0, tso=run_queue_hd;
3590 tso != END_TSO_QUEUE;
3599 sched_belch(char *s, ...)
3603 #ifdef RTS_SUPPORTS_THREADS
3604 fprintf(stderr, "sched (task %p): ", osThreadId());
3606 fprintf(stderr, "== ");
3608 fprintf(stderr, "sched: ");
3610 vfprintf(stderr, s, ap);
3611 fprintf(stderr, "\n");