1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.128 2002/02/15 20:58:14 sof Exp $
4 * (c) The GHC Team, 1998-2000
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distributed memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
18 * --------------------------------------------------------------------------*/
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
24 * Version with scheduler monitor support for SMPs (WAY=s):
26 This design provides a high-level API to create and schedule threads etc.
27 as documented in the SMP design document.
29 It uses a monitor design controlled by a single mutex to exercise control
30 over accesses to shared data structures, and builds on the Posix threads
33 The majority of state is shared. In order to keep essential per-task state,
34 there is a Capability structure, which contains all the information
35 needed to run a thread: its STG registers, a pointer to its TSO, a
36 nursery etc. During STG execution, a pointer to the capability is
37 kept in a register (BaseReg).
39 In a non-SMP build, there is one global capability, namely MainRegTable.
43 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
45 The main scheduling loop in GUM iterates until a finish message is received.
46 In that case a global flag @receivedFinish@ is set and this instance of
47 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48 for the handling of incoming messages, such as PP_FINISH.
49 Note that in the parallel case we have a system manager that coordinates
50 different PEs, each of which are running one instance of the RTS.
51 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
54 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
56 The main scheduling code in GranSim is quite different from that in std
57 (concurrent) Haskell: while concurrent Haskell just iterates over the
58 threads in the runnable queue, GranSim is event driven, i.e. it iterates
59 over the events in the global event queue. -- HWL
64 //* Variables and Data structures::
65 //* Main scheduling loop::
66 //* Suspend and Resume::
68 //* Garbage Collextion Routines::
69 //* Blocking Queue Routines::
70 //* Exception Handling Routines::
71 //* Debugging Routines::
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
78 #include "PosixSource.h"
85 #include "StgStartup.h"
88 #include "StgMiscClosures.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
124 * These are the threads which clients have requested that we run.
126 * In a 'threaded' build, we might have several concurrent clients all
127 * waiting for results, and each one will wait on a condition variable
128 * until the result is available.
130 * In non-SMP, clients are strictly nested: the first client calls
131 * into the RTS, which might call out again to C with a _ccall_GC, and
132 * eventually re-enter the RTS.
134 * Main threads information is kept in a linked list:
136 //@cindex StgMainThread
137 typedef struct StgMainThread_ {
139 SchedulerStatus stat;
141 #if defined(RTS_SUPPORTS_THREADS)
144 struct StgMainThread_ *link;
147 /* Main thread queue.
148 * Locks required: sched_mutex.
150 static StgMainThread *main_threads;
153 * Locks required: sched_mutex.
157 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
161 In GranSim we have a runnable and a blocked queue for each processor.
162 In order to minimise code changes new arrays run_queue_hds/tls
163 are created. run_queue_hd is then a short cut (macro) for
164 run_queue_hds[CurrentProc] (see GranSim.h).
167 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
168 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
169 StgTSO *ccalling_threadss[MAX_PROC];
170 /* We use the same global list of threads (all_threads) in GranSim as in
171 the std RTS (i.e. we are cheating). However, we don't use this list in
172 the GranSim specific code at the moment (so we are only potentially
177 StgTSO *run_queue_hd, *run_queue_tl;
178 StgTSO *blocked_queue_hd, *blocked_queue_tl;
179 StgTSO *sleeping_queue; /* perhaps replace with a hash table? */
183 /* Linked list of all threads.
184 * Used for detecting garbage collected threads.
188 /* When a thread performs a safe C call (_ccall_GC, using old
189 * terminology), it gets put on the suspended_ccalling_threads
190 * list. Used by the garbage collector.
192 static StgTSO *suspended_ccalling_threads;
194 static StgTSO *threadStackOverflow(StgTSO *tso);
196 /* KH: The following two flags are shared memory locations. There is no need
197 to lock them, since they are only unset at the end of a scheduler
201 /* flag set by signal handler to precipitate a context switch */
202 //@cindex context_switch
205 /* if this flag is set as well, give up execution */
206 //@cindex interrupted
209 /* Next thread ID to allocate.
210 * Locks required: sched_mutex
212 //@cindex next_thread_id
213 StgThreadID next_thread_id = 1;
216 * Pointers to the state of the current thread.
217 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
218 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
221 /* The smallest stack size that makes any sense is:
222 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
223 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
224 * + 1 (the realworld token for an IO thread)
225 * + 1 (the closure to enter)
227 * A thread with this stack will bomb immediately with a stack
228 * overflow, which will increase its stack size.
231 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
238 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
239 * exists - earlier gccs apparently didn't.
246 void addToBlockedQueue ( StgTSO *tso );
248 static void schedule ( void );
249 void interruptStgRts ( void );
251 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
253 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
256 static void detectBlackHoles ( void );
259 static void sched_belch(char *s, ...);
262 #if defined(RTS_SUPPORTS_THREADS)
263 /* ToDo: carefully document the invariants that go together
264 * with these synchronisation objects.
266 Mutex sched_mutex = INIT_MUTEX_VAR;
267 Mutex term_mutex = INIT_MUTEX_VAR;
270 static Condition gc_pending_cond = INIT_COND_VAR;
274 #endif /* RTS_SUPPORTS_THREADS */
278 rtsTime TimeOfLastYield;
279 rtsBool emitSchedule = rtsTrue;
283 char *whatNext_strs[] = {
291 char *threadReturnCode_strs[] = {
292 "HeapOverflow", /* might also be StackOverflow */
301 StgTSO * createSparkThread(rtsSpark spark);
302 StgTSO * activateSpark (rtsSpark spark);
306 * The thread state for the main thread.
307 // ToDo: check whether not needed any more
311 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
312 static void taskStart(void);
323 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
324 //@subsection Main scheduling loop
326 /* ---------------------------------------------------------------------------
327 Main scheduling loop.
329 We use round-robin scheduling, each thread returning to the
330 scheduler loop when one of these conditions is detected:
333 * timer expires (thread yields)
338 Locking notes: we acquire the scheduler lock once at the beginning
339 of the scheduler loop, and release it when
341 * running a thread, or
342 * waiting for work, or
343 * waiting for a GC to complete.
346 In a GranSim setup this loop iterates over the global event queue.
347 This revolves around the global event queue, which determines what
348 to do next. Therefore, it's more complicated than either the
349 concurrent or the parallel (GUM) setup.
352 GUM iterates over incoming messages.
353 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
354 and sends out a fish whenever it has nothing to do; in-between
355 doing the actual reductions (shared code below) it processes the
356 incoming messages and deals with delayed operations
357 (see PendingFetches).
358 This is not the ugliest code you could imagine, but it's bloody close.
360 ------------------------------------------------------------------------ */
367 StgThreadReturnCode ret;
375 rtsBool receivedFinish = rtsFalse;
377 nat tp_size, sp_size; // stats only
380 rtsBool was_interrupted = rtsFalse;
382 ACQUIRE_LOCK(&sched_mutex);
384 #if defined(RTS_SUPPORTS_THREADS)
385 /* Check to see whether there are any worker threads
386 waiting to deposit external call results. If so,
387 yield our capability */
388 yieldToReturningWorker(&sched_mutex, cap);
390 waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
394 /* set up first event to get things going */
395 /* ToDo: assign costs for system setup and init MainTSO ! */
396 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
398 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
401 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
402 G_TSO(CurrentTSO, 5));
404 if (RtsFlags.GranFlags.Light) {
405 /* Save current time; GranSim Light only */
406 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
409 event = get_next_event();
411 while (event!=(rtsEvent*)NULL) {
412 /* Choose the processor with the next event */
413 CurrentProc = event->proc;
414 CurrentTSO = event->tso;
418 while (!receivedFinish) { /* set by processMessages */
419 /* when receiving PP_FINISH message */
426 IF_DEBUG(scheduler, printAllThreads());
428 /* If we're interrupted (the user pressed ^C, or some other
429 * termination condition occurred), kill all the currently running
433 IF_DEBUG(scheduler, sched_belch("interrupted"));
435 interrupted = rtsFalse;
436 was_interrupted = rtsTrue;
439 /* Go through the list of main threads and wake up any
440 * clients whose computations have finished. ToDo: this
441 * should be done more efficiently without a linear scan
442 * of the main threads list, somehow...
444 #if defined(RTS_SUPPORTS_THREADS)
446 StgMainThread *m, **prev;
447 prev = &main_threads;
448 for (m = main_threads; m != NULL; m = m->link) {
449 switch (m->tso->what_next) {
452 *(m->ret) = (StgClosure *)m->tso->sp[0];
456 broadcastCondition(&m->wakeup);
459 if (m->ret) *(m->ret) = NULL;
461 if (was_interrupted) {
462 m->stat = Interrupted;
466 broadcastCondition(&m->wakeup);
474 #else /* not threaded */
477 /* in GUM do this only on the Main PE */
480 /* If our main thread has finished or been killed, return.
483 StgMainThread *m = main_threads;
484 if (m->tso->what_next == ThreadComplete
485 || m->tso->what_next == ThreadKilled) {
486 main_threads = main_threads->link;
487 if (m->tso->what_next == ThreadComplete) {
488 /* we finished successfully, fill in the return value */
489 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
493 if (m->ret) { *(m->ret) = NULL; };
494 if (was_interrupted) {
495 m->stat = Interrupted;
505 /* Top up the run queue from our spark pool. We try to make the
506 * number of threads in the run queue equal to the number of
509 * Disable spark support in SMP for now, non-essential & requires
510 * a little bit of work to make it compile cleanly. -- sof 1/02.
512 #if 0 /* defined(SMP) */
514 nat n = getFreeCapabilities();
515 StgTSO *tso = run_queue_hd;
517 /* Count the run queue */
518 while (n > 0 && tso != END_TSO_QUEUE) {
525 spark = findSpark(rtsFalse);
527 break; /* no more sparks in the pool */
529 /* I'd prefer this to be done in activateSpark -- HWL */
530 /* tricky - it needs to hold the scheduler lock and
531 * not try to re-acquire it -- SDM */
532 createSparkThread(spark);
534 sched_belch("==^^ turning spark of closure %p into a thread",
535 (StgClosure *)spark));
538 /* We need to wake up the other tasks if we just created some
541 if (getFreeCapabilities() - n > 1) {
542 signalCondition( &thread_ready_cond );
547 /* check for signals each time around the scheduler */
548 #ifndef mingw32_TARGET_OS
549 if (signals_pending()) {
550 startSignalHandlers();
554 /* Check whether any waiting threads need to be woken up. If the
555 * run queue is empty, and there are no other tasks running, we
556 * can wait indefinitely for something to happen.
557 * ToDo: what if another client comes along & requests another
560 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
561 awaitEvent( EMPTY_RUN_QUEUE()
563 && allFreeCapabilities()
567 /* we can be interrupted while waiting for I/O... */
568 if (interrupted) continue;
571 * Detect deadlock: when we have no threads to run, there are no
572 * threads waiting on I/O or sleeping, and all the other tasks are
573 * waiting for work, we must have a deadlock of some description.
575 * We first try to find threads blocked on themselves (ie. black
576 * holes), and generate NonTermination exceptions where necessary.
578 * If no threads are black holed, we have a deadlock situation, so
579 * inform all the main threads.
582 if ( EMPTY_RUN_QUEUE()
583 && EMPTY_QUEUE(blocked_queue_hd)
584 && EMPTY_QUEUE(sleeping_queue)
585 #if defined(RTS_SUPPORTS_THREADS)
586 && EMPTY_QUEUE(suspended_ccalling_threads)
589 && allFreeCapabilities()
593 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
594 #if defined(THREADED_RTS)
595 /* and SMP mode ..? */
596 releaseCapability(cap);
598 RELEASE_LOCK(&sched_mutex);
599 GarbageCollect(GetRoots,rtsTrue);
600 ACQUIRE_LOCK(&sched_mutex);
601 if ( EMPTY_QUEUE(blocked_queue_hd)
603 && EMPTY_QUEUE(sleeping_queue) ) {
605 IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
608 /* No black holes, so probably a real deadlock. Send the
609 * current main thread the Deadlock exception (or in the SMP
610 * build, send *all* main threads the deadlock exception,
611 * since none of them can make progress).
613 if ( EMPTY_RUN_QUEUE() ) {
615 #if defined(RTS_SUPPORTS_THREADS)
616 for (m = main_threads; m != NULL; m = m->link) {
617 switch (m->tso->why_blocked) {
618 case BlockedOnBlackHole:
619 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
621 case BlockedOnException:
623 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
626 barf("deadlock: main thread blocked in a strange way");
631 switch (m->tso->why_blocked) {
632 case BlockedOnBlackHole:
633 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
635 case BlockedOnException:
637 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
640 barf("deadlock: main thread blocked in a strange way");
644 #if defined(RTS_SUPPORTS_THREADS)
645 /* ToDo: revisit conditions (and mechanism) for shutting
646 down a multi-threaded world */
647 if ( EMPTY_RUN_QUEUE() ) {
648 IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
649 shutdownHaskellAndExit(0);
652 ASSERT( !EMPTY_RUN_QUEUE() );
656 /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
660 /* If there's a GC pending, don't do anything until it has
664 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
665 waitCondition( &gc_pending_cond, &sched_mutex );
669 #if defined(RTS_SUPPORTS_THREADS)
670 /* block until we've got a thread on the run queue and a free
674 if ( EMPTY_RUN_QUEUE() ) {
675 /* Give up our capability */
676 releaseCapability(cap);
677 IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
678 waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
679 IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
681 while ( EMPTY_RUN_QUEUE() ) {
682 waitForWorkCapability(&sched_mutex, &cap);
683 IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
690 if (RtsFlags.GranFlags.Light)
691 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
693 /* adjust time based on time-stamp */
694 if (event->time > CurrentTime[CurrentProc] &&
695 event->evttype != ContinueThread)
696 CurrentTime[CurrentProc] = event->time;
698 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
699 if (!RtsFlags.GranFlags.Light)
702 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
704 /* main event dispatcher in GranSim */
705 switch (event->evttype) {
706 /* Should just be continuing execution */
708 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
709 /* ToDo: check assertion
710 ASSERT(run_queue_hd != (StgTSO*)NULL &&
711 run_queue_hd != END_TSO_QUEUE);
713 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
714 if (!RtsFlags.GranFlags.DoAsyncFetch &&
715 procStatus[CurrentProc]==Fetching) {
716 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
717 CurrentTSO->id, CurrentTSO, CurrentProc);
720 /* Ignore ContinueThreads for completed threads */
721 if (CurrentTSO->what_next == ThreadComplete) {
722 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
723 CurrentTSO->id, CurrentTSO, CurrentProc);
726 /* Ignore ContinueThreads for threads that are being migrated */
727 if (PROCS(CurrentTSO)==Nowhere) {
728 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
729 CurrentTSO->id, CurrentTSO, CurrentProc);
732 /* The thread should be at the beginning of the run queue */
733 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
734 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
735 CurrentTSO->id, CurrentTSO, CurrentProc);
736 break; // run the thread anyway
739 new_event(proc, proc, CurrentTime[proc],
741 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
743 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
744 break; // now actually run the thread; DaH Qu'vam yImuHbej
747 do_the_fetchnode(event);
748 goto next_thread; /* handle next event in event queue */
751 do_the_globalblock(event);
752 goto next_thread; /* handle next event in event queue */
755 do_the_fetchreply(event);
756 goto next_thread; /* handle next event in event queue */
758 case UnblockThread: /* Move from the blocked queue to the tail of */
759 do_the_unblock(event);
760 goto next_thread; /* handle next event in event queue */
762 case ResumeThread: /* Move from the blocked queue to the tail of */
763 /* the runnable queue ( i.e. Qu' SImqa'lu') */
764 event->tso->gran.blocktime +=
765 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
766 do_the_startthread(event);
767 goto next_thread; /* handle next event in event queue */
770 do_the_startthread(event);
771 goto next_thread; /* handle next event in event queue */
774 do_the_movethread(event);
775 goto next_thread; /* handle next event in event queue */
778 do_the_movespark(event);
779 goto next_thread; /* handle next event in event queue */
782 do_the_findwork(event);
783 goto next_thread; /* handle next event in event queue */
786 barf("Illegal event type %u\n", event->evttype);
789 /* This point was scheduler_loop in the old RTS */
791 IF_DEBUG(gran, belch("GRAN: after main switch"));
793 TimeOfLastEvent = CurrentTime[CurrentProc];
794 TimeOfNextEvent = get_time_of_next_event();
795 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
796 // CurrentTSO = ThreadQueueHd;
798 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
801 if (RtsFlags.GranFlags.Light)
802 GranSimLight_leave_system(event, &ActiveTSO);
804 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
807 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
809 /* in a GranSim setup the TSO stays on the run queue */
811 /* Take a thread from the run queue. */
812 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
815 fprintf(stderr, "GRAN: About to run current thread, which is\n");
818 context_switch = 0; // turned on via GranYield, checking events and time slice
821 DumpGranEvent(GR_SCHEDULE, t));
823 procStatus[CurrentProc] = Busy;
826 if (PendingFetches != END_BF_QUEUE) {
830 /* ToDo: phps merge with spark activation above */
831 /* check whether we have local work and send requests if we have none */
832 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
833 /* :-[ no local threads => look out for local sparks */
834 /* the spark pool for the current PE */
835 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
836 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
837 pool->hd < pool->tl) {
839 * ToDo: add GC code check that we really have enough heap afterwards!!
841 * If we're here (no runnable threads) and we have pending
842 * sparks, we must have a space problem. Get enough space
843 * to turn one of those pending sparks into a
847 spark = findSpark(rtsFalse); /* get a spark */
848 if (spark != (rtsSpark) NULL) {
849 tso = activateSpark(spark); /* turn the spark into a thread */
850 IF_PAR_DEBUG(schedule,
851 belch("==== schedule: Created TSO %d (%p); %d threads active",
852 tso->id, tso, advisory_thread_count));
854 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
855 belch("==^^ failed to activate spark");
857 } /* otherwise fall through & pick-up new tso */
859 IF_PAR_DEBUG(verbose,
860 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
861 spark_queue_len(pool)));
866 /* If we still have no work we need to send a FISH to get a spark
869 if (EMPTY_RUN_QUEUE()) {
870 /* =8-[ no local sparks => look for work on other PEs */
872 * We really have absolutely no work. Send out a fish
873 * (there may be some out there already), and wait for
874 * something to arrive. We clearly can't run any threads
875 * until a SCHEDULE or RESUME arrives, and so that's what
876 * we're hoping to see. (Of course, we still have to
877 * respond to other types of messages.)
879 TIME now = msTime() /*CURRENT_TIME*/;
880 IF_PAR_DEBUG(verbose,
881 belch("-- now=%ld", now));
882 IF_PAR_DEBUG(verbose,
883 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
884 (last_fish_arrived_at!=0 &&
885 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
886 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
887 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
888 last_fish_arrived_at,
889 RtsFlags.ParFlags.fishDelay, now);
892 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
893 (last_fish_arrived_at==0 ||
894 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
895 /* outstandingFishes is set in sendFish, processFish;
896 avoid flooding system with fishes via delay */
898 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
901 // Global statistics: count no. of fishes
902 if (RtsFlags.ParFlags.ParStats.Global &&
903 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
904 globalParStats.tot_fish_mess++;
908 receivedFinish = processMessages();
911 } else if (PacketsWaiting()) { /* Look for incoming messages */
912 receivedFinish = processMessages();
915 /* Now we are sure that we have some work available */
916 ASSERT(run_queue_hd != END_TSO_QUEUE);
918 /* Take a thread from the run queue, if we have work */
919 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
920 IF_DEBUG(sanity,checkTSO(t));
922 /* ToDo: write something to the log-file
923 if (RTSflags.ParFlags.granSimStats && !sameThread)
924 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
928 /* the spark pool for the current PE */
929 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
932 belch("--=^ %d threads, %d sparks on [%#x]",
933 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
936 if (0 && RtsFlags.ParFlags.ParStats.Full &&
937 t && LastTSO && t->id != LastTSO->id &&
938 LastTSO->why_blocked == NotBlocked &&
939 LastTSO->what_next != ThreadComplete) {
940 // if previously scheduled TSO not blocked we have to record the context switch
941 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
942 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
945 if (RtsFlags.ParFlags.ParStats.Full &&
946 (emitSchedule /* forced emit */ ||
947 (t && LastTSO && t->id != LastTSO->id))) {
949 we are running a different TSO, so write a schedule event to log file
950 NB: If we use fair scheduling we also have to write a deschedule
951 event for LastTSO; with unfair scheduling we know that the
952 previous tso has blocked whenever we switch to another tso, so
953 we don't need it in GUM for now
955 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
956 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
957 emitSchedule = rtsFalse;
961 #else /* !GRAN && !PAR */
963 /* grab a thread from the run queue */
964 ASSERT(run_queue_hd != END_TSO_QUEUE);
966 // Sanity check the thread we're about to run. This can be
967 // expensive if there is lots of thread switching going on...
968 IF_DEBUG(sanity,checkTSO(t));
971 grabCapability(&cap);
972 cap->r.rCurrentTSO = t;
974 /* context switches are now initiated by the timer signal, unless
975 * the user specified "context switch as often as possible", with
980 RtsFlags.ProfFlags.profileInterval == 0 ||
982 (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
983 && (run_queue_hd != END_TSO_QUEUE
984 || blocked_queue_hd != END_TSO_QUEUE
985 || sleeping_queue != END_TSO_QUEUE)))
990 RELEASE_LOCK(&sched_mutex);
992 IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
993 t->id, t, whatNext_strs[t->what_next]));
996 startHeapProfTimer();
999 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1000 /* Run the current thread
1002 switch (cap->r.rCurrentTSO->what_next) {
1004 case ThreadComplete:
1005 /* Thread already finished, return to scheduler. */
1006 ret = ThreadFinished;
1008 case ThreadEnterGHC:
1009 ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1012 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1014 case ThreadEnterInterp:
1015 ret = interpretBCO(cap);
1018 barf("schedule: invalid what_next field");
1020 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1022 /* Costs for the scheduler are assigned to CCS_SYSTEM */
1024 stopHeapProfTimer();
1028 ACQUIRE_LOCK(&sched_mutex);
1031 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1032 #elif !defined(GRAN) && !defined(PAR)
1033 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1035 t = cap->r.rCurrentTSO;
1038 /* HACK 675: if the last thread didn't yield, make sure to print a
1039 SCHEDULE event to the log file when StgRunning the next thread, even
1040 if it is the same one as before */
1042 TimeOfLastYield = CURRENT_TIME;
1048 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1049 globalGranStats.tot_heapover++;
1051 globalParStats.tot_heapover++;
1054 // did the task ask for a large block?
1055 if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1056 // if so, get one and push it on the front of the nursery.
1060 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1062 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)",
1064 whatNext_strs[t->what_next], blocks));
1066 // don't do this if it would push us over the
1067 // alloc_blocks_lim limit; we'll GC first.
1068 if (alloc_blocks + blocks < alloc_blocks_lim) {
1070 alloc_blocks += blocks;
1071 bd = allocGroup( blocks );
1073 // link the new group into the list
1074 bd->link = cap->r.rCurrentNursery;
1075 bd->u.back = cap->r.rCurrentNursery->u.back;
1076 if (cap->r.rCurrentNursery->u.back != NULL) {
1077 cap->r.rCurrentNursery->u.back->link = bd;
1079 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1080 g0s0->blocks == cap->r.rNursery);
1081 cap->r.rNursery = g0s0->blocks = bd;
1083 cap->r.rCurrentNursery->u.back = bd;
1085 // initialise it as a nursery block
1089 bd->free = bd->start;
1091 // don't forget to update the block count in g0s0.
1092 g0s0->n_blocks += blocks;
1093 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1095 // now update the nursery to point to the new block
1096 cap->r.rCurrentNursery = bd;
1098 // we might be unlucky and have another thread get on the
1099 // run queue before us and steal the large block, but in that
1100 // case the thread will just end up requesting another large
1102 PUSH_ON_RUN_QUEUE(t);
1107 /* make all the running tasks block on a condition variable,
1108 * maybe set context_switch and wait till they all pile in,
1109 * then have them wait on a GC condition variable.
1111 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
1112 t->id, t, whatNext_strs[t->what_next]));
1115 ASSERT(!is_on_queue(t,CurrentProc));
1117 /* Currently we emit a DESCHEDULE event before GC in GUM.
1118 ToDo: either add separate event to distinguish SYSTEM time from rest
1119 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1120 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1121 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1122 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1123 emitSchedule = rtsTrue;
1127 ready_to_gc = rtsTrue;
1128 context_switch = 1; /* stop other threads ASAP */
1129 PUSH_ON_RUN_QUEUE(t);
1130 /* actual GC is done at the end of the while loop */
1136 DumpGranEvent(GR_DESCHEDULE, t));
1137 globalGranStats.tot_stackover++;
1140 // DumpGranEvent(GR_DESCHEDULE, t);
1141 globalParStats.tot_stackover++;
1143 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
1144 t->id, t, whatNext_strs[t->what_next]));
1145 /* just adjust the stack for this thread, then pop it back
1151 /* enlarge the stack */
1152 StgTSO *new_t = threadStackOverflow(t);
1154 /* This TSO has moved, so update any pointers to it from the
1155 * main thread stack. It better not be on any other queues...
1156 * (it shouldn't be).
1158 for (m = main_threads; m != NULL; m = m->link) {
1163 threadPaused(new_t);
1164 PUSH_ON_RUN_QUEUE(new_t);
1168 case ThreadYielding:
1171 DumpGranEvent(GR_DESCHEDULE, t));
1172 globalGranStats.tot_yields++;
1175 // DumpGranEvent(GR_DESCHEDULE, t);
1176 globalParStats.tot_yields++;
1178 /* put the thread back on the run queue. Then, if we're ready to
1179 * GC, check whether this is the last task to stop. If so, wake
1180 * up the GC thread. getThread will block during a GC until the
1184 if (t->what_next == ThreadEnterInterp) {
1185 /* ToDo: or maybe a timer expired when we were in Hugs?
1186 * or maybe someone hit ctrl-C
1188 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
1189 t->id, t, whatNext_strs[t->what_next]);
1191 belch("--<< thread %ld (%p; %s) stopped, yielding",
1192 t->id, t, whatNext_strs[t->what_next]);
1199 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1201 ASSERT(t->link == END_TSO_QUEUE);
1203 ASSERT(!is_on_queue(t,CurrentProc));
1206 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1207 checkThreadQsSanity(rtsTrue));
1210 if (RtsFlags.ParFlags.doFairScheduling) {
1211 /* this does round-robin scheduling; good for concurrency */
1212 APPEND_TO_RUN_QUEUE(t);
1214 /* this does unfair scheduling; good for parallelism */
1215 PUSH_ON_RUN_QUEUE(t);
1218 /* this does round-robin scheduling; good for concurrency */
1219 APPEND_TO_RUN_QUEUE(t);
1222 /* add a ContinueThread event to actually process the thread */
1223 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1225 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1227 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1236 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1237 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1238 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1240 // ??? needed; should emit block before
1242 DumpGranEvent(GR_DESCHEDULE, t));
1243 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1246 ASSERT(procStatus[CurrentProc]==Busy ||
1247 ((procStatus[CurrentProc]==Fetching) &&
1248 (t->block_info.closure!=(StgClosure*)NULL)));
1249 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1250 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1251 procStatus[CurrentProc]==Fetching))
1252 procStatus[CurrentProc] = Idle;
1256 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1257 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1260 if (t->block_info.closure!=(StgClosure*)NULL)
1261 print_bq(t->block_info.closure));
1263 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1266 /* whatever we schedule next, we must log that schedule */
1267 emitSchedule = rtsTrue;
1270 /* don't need to do anything. Either the thread is blocked on
1271 * I/O, in which case we'll have called addToBlockedQueue
1272 * previously, or it's blocked on an MVar or Blackhole, in which
1273 * case it'll be on the relevant queue already.
1276 fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1277 printThreadBlockage(t);
1278 fprintf(stderr, "\n"));
1280 /* Only for dumping event to log file
1281 ToDo: do I need this in GranSim, too?
1288 case ThreadFinished:
1289 /* Need to check whether this was a main thread, and if so, signal
1290 * the task that started it with the return value. If we have no
1291 * more main threads, we probably need to stop all the tasks until
1294 /* We also end up here if the thread kills itself with an
1295 * uncaught exception, see Exception.hc.
1297 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1299 endThread(t, CurrentProc); // clean-up the thread
1301 /* For now all are advisory -- HWL */
1302 //if(t->priority==AdvisoryPriority) ??
1303 advisory_thread_count--;
1306 if(t->dist.priority==RevalPriority)
1310 if (RtsFlags.ParFlags.ParStats.Full &&
1311 !RtsFlags.ParFlags.ParStats.Suppressed)
1312 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1317 barf("schedule: invalid thread return code %d", (int)ret);
1320 #if defined(RTS_SUPPORTS_THREADS)
1321 /* I don't understand what this re-grab is doing -- sof */
1322 grabCapability(&cap);
1326 if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1327 GarbageCollect(GetRoots, rtsTrue);
1329 performHeapProfile = rtsFalse;
1330 ready_to_gc = rtsFalse; // we already GC'd
1336 && allFreeCapabilities()
1339 /* everybody back, start the GC.
1340 * Could do it in this thread, or signal a condition var
1341 * to do it in another thread. Either way, we need to
1342 * broadcast on gc_pending_cond afterward.
1344 #if defined(RTS_SUPPORTS_THREADS)
1345 IF_DEBUG(scheduler,sched_belch("doing GC"));
1347 GarbageCollect(GetRoots,rtsFalse);
1348 ready_to_gc = rtsFalse;
1350 broadcastCondition(&gc_pending_cond);
1353 /* add a ContinueThread event to continue execution of current thread */
1354 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1356 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1358 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1366 IF_GRAN_DEBUG(unused,
1367 print_eventq(EventHd));
1369 event = get_next_event();
1372 /* ToDo: wait for next message to arrive rather than busy wait */
1375 } /* end of while(1) */
1377 IF_PAR_DEBUG(verbose,
1378 belch("== Leaving schedule() after having received Finish"));
1381 /* ---------------------------------------------------------------------------
1382 * deleteAllThreads(): kill all the live threads.
1384 * This is used when we catch a user interrupt (^C), before performing
1385 * any necessary cleanups and running finalizers.
1386 * ------------------------------------------------------------------------- */
1388 void deleteAllThreads ( void )
1391 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1392 for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) {
1396 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) {
1400 for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) {
1404 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1405 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1406 sleeping_queue = END_TSO_QUEUE;
1409 /* startThread and insertThread are now in GranSim.c -- HWL */
1412 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1413 //@subsection Suspend and Resume
1415 /* ---------------------------------------------------------------------------
1416 * Suspending & resuming Haskell threads.
1418 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1419 * its capability before calling the C function. This allows another
1420 * task to pick up the capability and carry on running Haskell
1421 * threads. It also means that if the C call blocks, it won't lock
1424 * The Haskell thread making the C call is put to sleep for the
1425 * duration of the call, on the susepended_ccalling_threads queue. We
1426 * give out a token to the task, which it can use to resume the thread
1427 * on return from the C function.
1428 * ------------------------------------------------------------------------- */
1431 suspendThread( StgRegTable *reg )
1436 /* assume that *reg is a pointer to the StgRegTable part
1439 cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1441 ACQUIRE_LOCK(&sched_mutex);
1444 sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1446 threadPaused(cap->r.rCurrentTSO);
1447 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1448 suspended_ccalling_threads = cap->r.rCurrentTSO;
1450 #if defined(RTS_SUPPORTS_THREADS)
1451 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1454 /* Use the thread ID as the token; it should be unique */
1455 tok = cap->r.rCurrentTSO->id;
1457 /* Hand back capability */
1458 releaseCapability(cap);
1460 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
1461 /* Preparing to leave the RTS, so ensure there's a native thread/task
1462 waiting to take over.
1464 ToDo: optimise this and only create a new task if there's a need
1465 for one (i.e., if there's only one Concurrent Haskell thread alive,
1466 there's no need to create a new task).
1468 IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1469 startTask(taskStart);
1472 /* Other threads _might_ be available for execution; signal this */
1474 RELEASE_LOCK(&sched_mutex);
1479 resumeThread( StgInt tok )
1481 StgTSO *tso, **prev;
1484 #if defined(RTS_SUPPORTS_THREADS)
1485 /* Wait for permission to re-enter the RTS with the result. */
1486 grabReturnCapability(&sched_mutex, &cap);
1488 grabCapability(&cap);
1491 /* Remove the thread off of the suspended list */
1492 prev = &suspended_ccalling_threads;
1493 for (tso = suspended_ccalling_threads;
1494 tso != END_TSO_QUEUE;
1495 prev = &tso->link, tso = tso->link) {
1496 if (tso->id == (StgThreadID)tok) {
1501 if (tso == END_TSO_QUEUE) {
1502 barf("resumeThread: thread not found");
1504 tso->link = END_TSO_QUEUE;
1505 /* Reset blocking status */
1506 tso->why_blocked = NotBlocked;
1508 RELEASE_LOCK(&sched_mutex);
1510 cap->r.rCurrentTSO = tso;
1515 /* ---------------------------------------------------------------------------
1517 * ------------------------------------------------------------------------ */
1518 static void unblockThread(StgTSO *tso);
1520 /* ---------------------------------------------------------------------------
1521 * Comparing Thread ids.
1523 * This is used from STG land in the implementation of the
1524 * instances of Eq/Ord for ThreadIds.
1525 * ------------------------------------------------------------------------ */
1527 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1529 StgThreadID id1 = tso1->id;
1530 StgThreadID id2 = tso2->id;
1532 if (id1 < id2) return (-1);
1533 if (id1 > id2) return 1;
1537 /* ---------------------------------------------------------------------------
1538 * Fetching the ThreadID from an StgTSO.
1540 * This is used in the implementation of Show for ThreadIds.
1541 * ------------------------------------------------------------------------ */
1542 int rts_getThreadId(const StgTSO *tso)
1547 /* ---------------------------------------------------------------------------
1548 Create a new thread.
1550 The new thread starts with the given stack size. Before the
1551 scheduler can run, however, this thread needs to have a closure
1552 (and possibly some arguments) pushed on its stack. See
1553 pushClosure() in Schedule.h.
1555 createGenThread() and createIOThread() (in SchedAPI.h) are
1556 convenient packaged versions of this function.
1558 currently pri (priority) is only used in a GRAN setup -- HWL
1559 ------------------------------------------------------------------------ */
1560 //@cindex createThread
1562 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1564 createThread(nat stack_size, StgInt pri)
1566 return createThread_(stack_size, rtsFalse, pri);
1570 createThread_(nat size, rtsBool have_lock, StgInt pri)
1574 createThread(nat stack_size)
1576 return createThread_(stack_size, rtsFalse);
1580 createThread_(nat size, rtsBool have_lock)
1587 /* First check whether we should create a thread at all */
1589 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1590 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1592 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1593 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1594 return END_TSO_QUEUE;
1600 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1603 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1605 /* catch ridiculously small stack sizes */
1606 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1607 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1610 stack_size = size - TSO_STRUCT_SIZEW;
1612 tso = (StgTSO *)allocate(size);
1613 TICK_ALLOC_TSO(stack_size, 0);
1615 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1617 SET_GRAN_HDR(tso, ThisPE);
1619 tso->what_next = ThreadEnterGHC;
1621 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1622 * protect the increment operation on next_thread_id.
1623 * In future, we could use an atomic increment instead.
1625 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1626 tso->id = next_thread_id++;
1627 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1629 tso->why_blocked = NotBlocked;
1630 tso->blocked_exceptions = NULL;
1632 tso->stack_size = stack_size;
1633 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1635 tso->sp = (P_)&(tso->stack) + stack_size;
1638 tso->prof.CCCS = CCS_MAIN;
1641 /* put a stop frame on the stack */
1642 tso->sp -= sizeofW(StgStopFrame);
1643 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1644 tso->su = (StgUpdateFrame*)tso->sp;
1648 tso->link = END_TSO_QUEUE;
1649 /* uses more flexible routine in GranSim */
1650 insertThread(tso, CurrentProc);
1652 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1658 if (RtsFlags.GranFlags.GranSimStats.Full)
1659 DumpGranEvent(GR_START,tso);
1661 if (RtsFlags.ParFlags.ParStats.Full)
1662 DumpGranEvent(GR_STARTQ,tso);
1663 /* HACk to avoid SCHEDULE
1667 /* Link the new thread on the global thread list.
1669 tso->global_link = all_threads;
1673 tso->dist.priority = MandatoryPriority; //by default that is...
1677 tso->gran.pri = pri;
1679 tso->gran.magic = TSO_MAGIC; // debugging only
1681 tso->gran.sparkname = 0;
1682 tso->gran.startedat = CURRENT_TIME;
1683 tso->gran.exported = 0;
1684 tso->gran.basicblocks = 0;
1685 tso->gran.allocs = 0;
1686 tso->gran.exectime = 0;
1687 tso->gran.fetchtime = 0;
1688 tso->gran.fetchcount = 0;
1689 tso->gran.blocktime = 0;
1690 tso->gran.blockcount = 0;
1691 tso->gran.blockedat = 0;
1692 tso->gran.globalsparks = 0;
1693 tso->gran.localsparks = 0;
1694 if (RtsFlags.GranFlags.Light)
1695 tso->gran.clock = Now; /* local clock */
1697 tso->gran.clock = 0;
1699 IF_DEBUG(gran,printTSO(tso));
1702 tso->par.magic = TSO_MAGIC; // debugging only
1704 tso->par.sparkname = 0;
1705 tso->par.startedat = CURRENT_TIME;
1706 tso->par.exported = 0;
1707 tso->par.basicblocks = 0;
1708 tso->par.allocs = 0;
1709 tso->par.exectime = 0;
1710 tso->par.fetchtime = 0;
1711 tso->par.fetchcount = 0;
1712 tso->par.blocktime = 0;
1713 tso->par.blockcount = 0;
1714 tso->par.blockedat = 0;
1715 tso->par.globalsparks = 0;
1716 tso->par.localsparks = 0;
1720 globalGranStats.tot_threads_created++;
1721 globalGranStats.threads_created_on_PE[CurrentProc]++;
1722 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1723 globalGranStats.tot_sq_probes++;
1725 // collect parallel global statistics (currently done together with GC stats)
1726 if (RtsFlags.ParFlags.ParStats.Global &&
1727 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1728 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
1729 globalParStats.tot_threads_created++;
1735 belch("==__ schedule: Created TSO %d (%p);",
1736 CurrentProc, tso, tso->id));
1738 IF_PAR_DEBUG(verbose,
1739 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1740 tso->id, tso, advisory_thread_count));
1742 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1743 tso->id, tso->stack_size));
1750 all parallel thread creation calls should fall through the following routine.
1753 createSparkThread(rtsSpark spark)
1755 ASSERT(spark != (rtsSpark)NULL);
1756 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1758 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1759 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1760 return END_TSO_QUEUE;
1764 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1765 if (tso==END_TSO_QUEUE)
1766 barf("createSparkThread: Cannot create TSO");
1768 tso->priority = AdvisoryPriority;
1770 pushClosure(tso,spark);
1771 PUSH_ON_RUN_QUEUE(tso);
1772 advisory_thread_count++;
1779 Turn a spark into a thread.
1780 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1783 //@cindex activateSpark
1785 activateSpark (rtsSpark spark)
1789 tso = createSparkThread(spark);
1790 if (RtsFlags.ParFlags.ParStats.Full) {
1791 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1792 IF_PAR_DEBUG(verbose,
1793 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1794 (StgClosure *)spark, info_type((StgClosure *)spark)));
1796 // ToDo: fwd info on local/global spark to thread -- HWL
1797 // tso->gran.exported = spark->exported;
1798 // tso->gran.locked = !spark->global;
1799 // tso->gran.sparkname = spark->name;
1805 /* ---------------------------------------------------------------------------
1808 * scheduleThread puts a thread on the head of the runnable queue.
1809 * This will usually be done immediately after a thread is created.
1810 * The caller of scheduleThread must create the thread using e.g.
1811 * createThread and push an appropriate closure
1812 * on this thread's stack before the scheduler is invoked.
1813 * ------------------------------------------------------------------------ */
1815 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1818 scheduleThread_(StgTSO *tso
1819 , rtsBool createTask
1820 #if !defined(THREADED_RTS)
1825 ACQUIRE_LOCK(&sched_mutex);
1827 /* Put the new thread on the head of the runnable queue. The caller
1828 * better push an appropriate closure on this thread's stack
1829 * beforehand. In the SMP case, the thread may start running as
1830 * soon as we release the scheduler lock below.
1832 PUSH_ON_RUN_QUEUE(tso);
1833 #if defined(THREADED_RTS)
1834 /* If main() is scheduling a thread, don't bother creating a
1838 startTask(taskStart);
1844 IF_DEBUG(scheduler,printTSO(tso));
1846 RELEASE_LOCK(&sched_mutex);
1849 void scheduleThread(StgTSO* tso)
1851 return scheduleThread_(tso, rtsFalse);
1854 void scheduleExtThread(StgTSO* tso)
1856 return scheduleThread_(tso, rtsTrue);
1859 /* ---------------------------------------------------------------------------
1862 * Initialise the scheduler. This resets all the queues - if the
1863 * queues contained any threads, they'll be garbage collected at the
1866 * ------------------------------------------------------------------------ */
1870 term_handler(int sig STG_UNUSED)
1873 ACQUIRE_LOCK(&term_mutex);
1875 RELEASE_LOCK(&term_mutex);
1886 for (i=0; i<=MAX_PROC; i++) {
1887 run_queue_hds[i] = END_TSO_QUEUE;
1888 run_queue_tls[i] = END_TSO_QUEUE;
1889 blocked_queue_hds[i] = END_TSO_QUEUE;
1890 blocked_queue_tls[i] = END_TSO_QUEUE;
1891 ccalling_threadss[i] = END_TSO_QUEUE;
1892 sleeping_queue = END_TSO_QUEUE;
1895 run_queue_hd = END_TSO_QUEUE;
1896 run_queue_tl = END_TSO_QUEUE;
1897 blocked_queue_hd = END_TSO_QUEUE;
1898 blocked_queue_tl = END_TSO_QUEUE;
1899 sleeping_queue = END_TSO_QUEUE;
1902 suspended_ccalling_threads = END_TSO_QUEUE;
1904 main_threads = NULL;
1905 all_threads = END_TSO_QUEUE;
1910 RtsFlags.ConcFlags.ctxtSwitchTicks =
1911 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1913 #if defined(RTS_SUPPORTS_THREADS)
1914 /* Initialise the mutex and condition variables used by
1916 initMutex(&sched_mutex);
1917 initMutex(&term_mutex);
1919 initCondition(&thread_ready_cond);
1923 initCondition(&gc_pending_cond);
1926 #if defined(RTS_SUPPORTS_THREADS)
1927 ACQUIRE_LOCK(&sched_mutex);
1930 /* Install the SIGHUP handler */
1933 struct sigaction action,oact;
1935 action.sa_handler = term_handler;
1936 sigemptyset(&action.sa_mask);
1937 action.sa_flags = 0;
1938 if (sigaction(SIGTERM, &action, &oact) != 0) {
1939 barf("can't install TERM handler");
1944 /* A capability holds the state a native thread needs in
1945 * order to execute STG code. At least one capability is
1946 * floating around (only SMP builds have more than one).
1950 #if defined(RTS_SUPPORTS_THREADS)
1951 /* start our haskell execution tasks */
1953 startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
1955 startTaskManager(0,taskStart);
1959 #if /* defined(SMP) ||*/ defined(PAR)
1963 #if defined(RTS_SUPPORTS_THREADS)
1964 RELEASE_LOCK(&sched_mutex);
1970 exitScheduler( void )
1972 #if defined(RTS_SUPPORTS_THREADS)
1977 /* -----------------------------------------------------------------------------
1978 Managing the per-task allocation areas.
1980 Each capability comes with an allocation area. These are
1981 fixed-length block lists into which allocation can be done.
1983 ToDo: no support for two-space collection at the moment???
1984 -------------------------------------------------------------------------- */
1986 /* -----------------------------------------------------------------------------
1987 * waitThread is the external interface for running a new computation
1988 * and waiting for the result.
1990 * In the non-SMP case, we create a new main thread, push it on the
1991 * main-thread stack, and invoke the scheduler to run it. The
1992 * scheduler will return when the top main thread on the stack has
1993 * completed or died, and fill in the necessary fields of the
1994 * main_thread structure.
1996 * In the SMP case, we create a main thread as before, but we then
1997 * create a new condition variable and sleep on it. When our new
1998 * main thread has completed, we'll be woken up and the status/result
1999 * will be in the main_thread struct.
2000 * -------------------------------------------------------------------------- */
2003 howManyThreadsAvail ( void )
2007 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2009 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2011 for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2017 finishAllThreads ( void )
2020 while (run_queue_hd != END_TSO_QUEUE) {
2021 waitThread ( run_queue_hd, NULL);
2023 while (blocked_queue_hd != END_TSO_QUEUE) {
2024 waitThread ( blocked_queue_hd, NULL);
2026 while (sleeping_queue != END_TSO_QUEUE) {
2027 waitThread ( blocked_queue_hd, NULL);
2030 (blocked_queue_hd != END_TSO_QUEUE ||
2031 run_queue_hd != END_TSO_QUEUE ||
2032 sleeping_queue != END_TSO_QUEUE);
2036 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2038 #if defined(THREADED_RTS)
2039 return waitThread_(tso,ret, rtsFalse);
2041 return waitThread_(tso,ret);
2046 waitThread_(StgTSO *tso,
2047 /*out*/StgClosure **ret
2048 #if defined(THREADED_RTS)
2049 , rtsBool blockWaiting
2054 SchedulerStatus stat;
2056 ACQUIRE_LOCK(&sched_mutex);
2058 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2063 #if defined(RTS_SUPPORTS_THREADS)
2064 initCondition(&m->wakeup);
2067 m->link = main_threads;
2070 IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2072 #if defined(RTS_SUPPORTS_THREADS)
2074 # if defined(THREADED_RTS)
2075 if (!blockWaiting) {
2076 /* In the threaded case, the OS thread that called main()
2077 * gets to enter the RTS directly without going via another
2080 RELEASE_LOCK(&sched_mutex);
2082 ASSERT(m->stat != NoStatus);
2086 IF_DEBUG(scheduler, sched_belch("sfoo"));
2088 waitCondition(&m->wakeup, &sched_mutex);
2089 } while (m->stat == NoStatus);
2092 /* GranSim specific init */
2093 CurrentTSO = m->tso; // the TSO to run
2094 procStatus[MainProc] = Busy; // status of main PE
2095 CurrentProc = MainProc; // PE to run it on
2099 RELEASE_LOCK(&sched_mutex);
2101 ASSERT(m->stat != NoStatus);
2106 #if defined(RTS_SUPPORTS_THREADS)
2107 closeCondition(&m->wakeup);
2110 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
2114 #if defined(THREADED_RTS)
2117 RELEASE_LOCK(&sched_mutex);
2122 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2123 //@subsection Run queue code
2127 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2128 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2129 implicit global variable that has to be correct when calling these
2133 /* Put the new thread on the head of the runnable queue.
2134 * The caller of createThread better push an appropriate closure
2135 * on this thread's stack before the scheduler is invoked.
2137 static /* inline */ void
2138 add_to_run_queue(tso)
2141 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2142 tso->link = run_queue_hd;
2144 if (run_queue_tl == END_TSO_QUEUE) {
2149 /* Put the new thread at the end of the runnable queue. */
2150 static /* inline */ void
2151 push_on_run_queue(tso)
2154 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2155 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2156 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2157 if (run_queue_hd == END_TSO_QUEUE) {
2160 run_queue_tl->link = tso;
2166 Should be inlined because it's used very often in schedule. The tso
2167 argument is actually only needed in GranSim, where we want to have the
2168 possibility to schedule *any* TSO on the run queue, irrespective of the
2169 actual ordering. Therefore, if tso is not the nil TSO then we traverse
2170 the run queue and dequeue the tso, adjusting the links in the queue.
2172 //@cindex take_off_run_queue
2173 static /* inline */ StgTSO*
2174 take_off_run_queue(StgTSO *tso) {
2178 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2180 if tso is specified, unlink that tso from the run_queue (doesn't have
2181 to be at the beginning of the queue); GranSim only
2183 if (tso!=END_TSO_QUEUE) {
2184 /* find tso in queue */
2185 for (t=run_queue_hd, prev=END_TSO_QUEUE;
2186 t!=END_TSO_QUEUE && t!=tso;
2190 /* now actually dequeue the tso */
2191 if (prev!=END_TSO_QUEUE) {
2192 ASSERT(run_queue_hd!=t);
2193 prev->link = t->link;
2195 /* t is at beginning of thread queue */
2196 ASSERT(run_queue_hd==t);
2197 run_queue_hd = t->link;
2199 /* t is at end of thread queue */
2200 if (t->link==END_TSO_QUEUE) {
2201 ASSERT(t==run_queue_tl);
2202 run_queue_tl = prev;
2204 ASSERT(run_queue_tl!=t);
2206 t->link = END_TSO_QUEUE;
2208 /* take tso from the beginning of the queue; std concurrent code */
2210 if (t != END_TSO_QUEUE) {
2211 run_queue_hd = t->link;
2212 t->link = END_TSO_QUEUE;
2213 if (run_queue_hd == END_TSO_QUEUE) {
2214 run_queue_tl = END_TSO_QUEUE;
2223 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2224 //@subsection Garbage Collextion Routines
2226 /* ---------------------------------------------------------------------------
2227 Where are the roots that we know about?
2229 - all the threads on the runnable queue
2230 - all the threads on the blocked queue
2231 - all the threads on the sleeping queue
2232 - all the thread currently executing a _ccall_GC
2233 - all the "main threads"
2235 ------------------------------------------------------------------------ */
2237 /* This has to be protected either by the scheduler monitor, or by the
2238 garbage collection monitor (probably the latter).
2243 GetRoots(evac_fn evac)
2250 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2251 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2252 evac((StgClosure **)&run_queue_hds[i]);
2253 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2254 evac((StgClosure **)&run_queue_tls[i]);
2256 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2257 evac((StgClosure **)&blocked_queue_hds[i]);
2258 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2259 evac((StgClosure **)&blocked_queue_tls[i]);
2260 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2261 evac((StgClosure **)&ccalling_threads[i]);
2268 if (run_queue_hd != END_TSO_QUEUE) {
2269 ASSERT(run_queue_tl != END_TSO_QUEUE);
2270 evac((StgClosure **)&run_queue_hd);
2271 evac((StgClosure **)&run_queue_tl);
2274 if (blocked_queue_hd != END_TSO_QUEUE) {
2275 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2276 evac((StgClosure **)&blocked_queue_hd);
2277 evac((StgClosure **)&blocked_queue_tl);
2280 if (sleeping_queue != END_TSO_QUEUE) {
2281 evac((StgClosure **)&sleeping_queue);
2285 for (m = main_threads; m != NULL; m = m->link) {
2286 evac((StgClosure **)&m->tso);
2288 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2289 evac((StgClosure **)&suspended_ccalling_threads);
2292 #if defined(PAR) || defined(GRAN)
2293 markSparkQueue(evac);
2297 /* -----------------------------------------------------------------------------
2300 This is the interface to the garbage collector from Haskell land.
2301 We provide this so that external C code can allocate and garbage
2302 collect when called from Haskell via _ccall_GC.
2304 It might be useful to provide an interface whereby the programmer
2305 can specify more roots (ToDo).
2307 This needs to be protected by the GC condition variable above. KH.
2308 -------------------------------------------------------------------------- */
2310 void (*extra_roots)(evac_fn);
2315 GarbageCollect(GetRoots,rtsFalse);
2319 performMajorGC(void)
2321 GarbageCollect(GetRoots,rtsTrue);
2325 AllRoots(evac_fn evac)
2327 GetRoots(evac); // the scheduler's roots
2328 extra_roots(evac); // the user's roots
2332 performGCWithRoots(void (*get_roots)(evac_fn))
2334 extra_roots = get_roots;
2335 GarbageCollect(AllRoots,rtsFalse);
2338 /* -----------------------------------------------------------------------------
2341 If the thread has reached its maximum stack size, then raise the
2342 StackOverflow exception in the offending thread. Otherwise
2343 relocate the TSO into a larger chunk of memory and adjust its stack
2345 -------------------------------------------------------------------------- */
2348 threadStackOverflow(StgTSO *tso)
2350 nat new_stack_size, new_tso_size, diff, stack_words;
2354 IF_DEBUG(sanity,checkTSO(tso));
2355 if (tso->stack_size >= tso->max_stack_size) {
2358 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2359 tso->id, tso, tso->stack_size, tso->max_stack_size);
2360 /* If we're debugging, just print out the top of the stack */
2361 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2364 /* Send this thread the StackOverflow exception */
2365 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2369 /* Try to double the current stack size. If that takes us over the
2370 * maximum stack size for this thread, then use the maximum instead.
2371 * Finally round up so the TSO ends up as a whole number of blocks.
2373 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2374 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2375 TSO_STRUCT_SIZE)/sizeof(W_);
2376 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2377 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2379 IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2381 dest = (StgTSO *)allocate(new_tso_size);
2382 TICK_ALLOC_TSO(new_stack_size,0);
2384 /* copy the TSO block and the old stack into the new area */
2385 memcpy(dest,tso,TSO_STRUCT_SIZE);
2386 stack_words = tso->stack + tso->stack_size - tso->sp;
2387 new_sp = (P_)dest + new_tso_size - stack_words;
2388 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2390 /* relocate the stack pointers... */
2391 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2392 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2394 dest->stack_size = new_stack_size;
2396 /* and relocate the update frame list */
2397 relocate_stack(dest, diff);
2399 /* Mark the old TSO as relocated. We have to check for relocated
2400 * TSOs in the garbage collector and any primops that deal with TSOs.
2402 * It's important to set the sp and su values to just beyond the end
2403 * of the stack, so we don't attempt to scavenge any part of the
2406 tso->what_next = ThreadRelocated;
2408 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2409 tso->su = (StgUpdateFrame *)tso->sp;
2410 tso->why_blocked = NotBlocked;
2411 dest->mut_link = NULL;
2413 IF_PAR_DEBUG(verbose,
2414 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2415 tso->id, tso, tso->stack_size);
2416 /* If we're debugging, just print out the top of the stack */
2417 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2420 IF_DEBUG(sanity,checkTSO(tso));
2422 IF_DEBUG(scheduler,printTSO(dest));
2428 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2429 //@subsection Blocking Queue Routines
2431 /* ---------------------------------------------------------------------------
2432 Wake up a queue that was blocked on some resource.
2433 ------------------------------------------------------------------------ */
2437 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2442 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2444 /* write RESUME events to log file and
2445 update blocked and fetch time (depending on type of the orig closure) */
2446 if (RtsFlags.ParFlags.ParStats.Full) {
2447 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2448 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2449 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2450 if (EMPTY_RUN_QUEUE())
2451 emitSchedule = rtsTrue;
2453 switch (get_itbl(node)->type) {
2455 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2460 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2467 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2474 static StgBlockingQueueElement *
2475 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2478 PEs node_loc, tso_loc;
2480 node_loc = where_is(node); // should be lifted out of loop
2481 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2482 tso_loc = where_is((StgClosure *)tso);
2483 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2484 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2485 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2486 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2487 // insertThread(tso, node_loc);
2488 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2490 tso, node, (rtsSpark*)NULL);
2491 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2494 } else { // TSO is remote (actually should be FMBQ)
2495 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2496 RtsFlags.GranFlags.Costs.gunblocktime +
2497 RtsFlags.GranFlags.Costs.latency;
2498 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2500 tso, node, (rtsSpark*)NULL);
2501 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2504 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2506 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2507 (node_loc==tso_loc ? "Local" : "Global"),
2508 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2509 tso->block_info.closure = NULL;
2510 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2514 static StgBlockingQueueElement *
2515 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2517 StgBlockingQueueElement *next;
2519 switch (get_itbl(bqe)->type) {
2521 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2522 /* if it's a TSO just push it onto the run_queue */
2524 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2525 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2527 unblockCount(bqe, node);
2528 /* reset blocking status after dumping event */
2529 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2533 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2535 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2536 PendingFetches = (StgBlockedFetch *)bqe;
2540 /* can ignore this case in a non-debugging setup;
2541 see comments on RBHSave closures above */
2543 /* check that the closure is an RBHSave closure */
2544 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2545 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2546 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2550 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2551 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2555 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2559 #else /* !GRAN && !PAR */
2561 unblockOneLocked(StgTSO *tso)
2565 ASSERT(get_itbl(tso)->type == TSO);
2566 ASSERT(tso->why_blocked != NotBlocked);
2567 tso->why_blocked = NotBlocked;
2569 PUSH_ON_RUN_QUEUE(tso);
2571 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2576 #if defined(GRAN) || defined(PAR)
2577 inline StgBlockingQueueElement *
2578 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2580 ACQUIRE_LOCK(&sched_mutex);
2581 bqe = unblockOneLocked(bqe, node);
2582 RELEASE_LOCK(&sched_mutex);
2587 unblockOne(StgTSO *tso)
2589 ACQUIRE_LOCK(&sched_mutex);
2590 tso = unblockOneLocked(tso);
2591 RELEASE_LOCK(&sched_mutex);
2598 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2600 StgBlockingQueueElement *bqe;
2605 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2606 node, CurrentProc, CurrentTime[CurrentProc],
2607 CurrentTSO->id, CurrentTSO));
2609 node_loc = where_is(node);
2611 ASSERT(q == END_BQ_QUEUE ||
2612 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2613 get_itbl(q)->type == CONSTR); // closure (type constructor)
2614 ASSERT(is_unique(node));
2616 /* FAKE FETCH: magically copy the node to the tso's proc;
2617 no Fetch necessary because in reality the node should not have been
2618 moved to the other PE in the first place
2620 if (CurrentProc!=node_loc) {
2622 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2623 node, node_loc, CurrentProc, CurrentTSO->id,
2624 // CurrentTSO, where_is(CurrentTSO),
2625 node->header.gran.procs));
2626 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2628 belch("## new bitmask of node %p is %#x",
2629 node, node->header.gran.procs));
2630 if (RtsFlags.GranFlags.GranSimStats.Global) {
2631 globalGranStats.tot_fake_fetches++;
2636 // ToDo: check: ASSERT(CurrentProc==node_loc);
2637 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2640 bqe points to the current element in the queue
2641 next points to the next element in the queue
2643 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2644 //tso_loc = where_is(tso);
2646 bqe = unblockOneLocked(bqe, node);
2649 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2650 the closure to make room for the anchor of the BQ */
2651 if (bqe!=END_BQ_QUEUE) {
2652 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2654 ASSERT((info_ptr==&RBH_Save_0_info) ||
2655 (info_ptr==&RBH_Save_1_info) ||
2656 (info_ptr==&RBH_Save_2_info));
2658 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2659 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2660 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2663 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2664 node, info_type(node)));
2667 /* statistics gathering */
2668 if (RtsFlags.GranFlags.GranSimStats.Global) {
2669 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2670 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2671 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2672 globalGranStats.tot_awbq++; // total no. of bqs awakened
2675 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2676 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2680 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2682 StgBlockingQueueElement *bqe;
2684 ACQUIRE_LOCK(&sched_mutex);
2686 IF_PAR_DEBUG(verbose,
2687 belch("##-_ AwBQ for node %p on [%x]: ",
2691 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2692 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2697 ASSERT(q == END_BQ_QUEUE ||
2698 get_itbl(q)->type == TSO ||
2699 get_itbl(q)->type == BLOCKED_FETCH ||
2700 get_itbl(q)->type == CONSTR);
2703 while (get_itbl(bqe)->type==TSO ||
2704 get_itbl(bqe)->type==BLOCKED_FETCH) {
2705 bqe = unblockOneLocked(bqe, node);
2707 RELEASE_LOCK(&sched_mutex);
2710 #else /* !GRAN && !PAR */
2712 awakenBlockedQueue(StgTSO *tso)
2714 ACQUIRE_LOCK(&sched_mutex);
2715 while (tso != END_TSO_QUEUE) {
2716 tso = unblockOneLocked(tso);
2718 RELEASE_LOCK(&sched_mutex);
2722 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2723 //@subsection Exception Handling Routines
2725 /* ---------------------------------------------------------------------------
2727 - usually called inside a signal handler so it mustn't do anything fancy.
2728 ------------------------------------------------------------------------ */
2731 interruptStgRts(void)
2737 /* -----------------------------------------------------------------------------
2740 This is for use when we raise an exception in another thread, which
2742 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2743 -------------------------------------------------------------------------- */
2745 #if defined(GRAN) || defined(PAR)
2747 NB: only the type of the blocking queue is different in GranSim and GUM
2748 the operations on the queue-elements are the same
2749 long live polymorphism!
2752 unblockThread(StgTSO *tso)
2754 StgBlockingQueueElement *t, **last;
2756 ACQUIRE_LOCK(&sched_mutex);
2757 switch (tso->why_blocked) {
2760 return; /* not blocked */
2763 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2765 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2766 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2768 last = (StgBlockingQueueElement **)&mvar->head;
2769 for (t = (StgBlockingQueueElement *)mvar->head;
2771 last = &t->link, last_tso = t, t = t->link) {
2772 if (t == (StgBlockingQueueElement *)tso) {
2773 *last = (StgBlockingQueueElement *)tso->link;
2774 if (mvar->tail == tso) {
2775 mvar->tail = (StgTSO *)last_tso;
2780 barf("unblockThread (MVAR): TSO not found");
2783 case BlockedOnBlackHole:
2784 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2786 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2788 last = &bq->blocking_queue;
2789 for (t = bq->blocking_queue;
2791 last = &t->link, t = t->link) {
2792 if (t == (StgBlockingQueueElement *)tso) {
2793 *last = (StgBlockingQueueElement *)tso->link;
2797 barf("unblockThread (BLACKHOLE): TSO not found");
2800 case BlockedOnException:
2802 StgTSO *target = tso->block_info.tso;
2804 ASSERT(get_itbl(target)->type == TSO);
2806 if (target->what_next == ThreadRelocated) {
2807 target = target->link;
2808 ASSERT(get_itbl(target)->type == TSO);
2811 ASSERT(target->blocked_exceptions != NULL);
2813 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2814 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2816 last = &t->link, t = t->link) {
2817 ASSERT(get_itbl(t)->type == TSO);
2818 if (t == (StgBlockingQueueElement *)tso) {
2819 *last = (StgBlockingQueueElement *)tso->link;
2823 barf("unblockThread (Exception): TSO not found");
2827 case BlockedOnWrite:
2829 /* take TSO off blocked_queue */
2830 StgBlockingQueueElement *prev = NULL;
2831 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2832 prev = t, t = t->link) {
2833 if (t == (StgBlockingQueueElement *)tso) {
2835 blocked_queue_hd = (StgTSO *)t->link;
2836 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2837 blocked_queue_tl = END_TSO_QUEUE;
2840 prev->link = t->link;
2841 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2842 blocked_queue_tl = (StgTSO *)prev;
2848 barf("unblockThread (I/O): TSO not found");
2851 case BlockedOnDelay:
2853 /* take TSO off sleeping_queue */
2854 StgBlockingQueueElement *prev = NULL;
2855 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2856 prev = t, t = t->link) {
2857 if (t == (StgBlockingQueueElement *)tso) {
2859 sleeping_queue = (StgTSO *)t->link;
2861 prev->link = t->link;
2866 barf("unblockThread (I/O): TSO not found");
2870 barf("unblockThread");
2874 tso->link = END_TSO_QUEUE;
2875 tso->why_blocked = NotBlocked;
2876 tso->block_info.closure = NULL;
2877 PUSH_ON_RUN_QUEUE(tso);
2878 RELEASE_LOCK(&sched_mutex);
2882 unblockThread(StgTSO *tso)
2886 ACQUIRE_LOCK(&sched_mutex);
2887 switch (tso->why_blocked) {
2890 return; /* not blocked */
2893 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2895 StgTSO *last_tso = END_TSO_QUEUE;
2896 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2899 for (t = mvar->head; t != END_TSO_QUEUE;
2900 last = &t->link, last_tso = t, t = t->link) {
2903 if (mvar->tail == tso) {
2904 mvar->tail = last_tso;
2909 barf("unblockThread (MVAR): TSO not found");
2912 case BlockedOnBlackHole:
2913 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2915 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2917 last = &bq->blocking_queue;
2918 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2919 last = &t->link, t = t->link) {
2925 barf("unblockThread (BLACKHOLE): TSO not found");
2928 case BlockedOnException:
2930 StgTSO *target = tso->block_info.tso;
2932 ASSERT(get_itbl(target)->type == TSO);
2934 while (target->what_next == ThreadRelocated) {
2935 target = target->link;
2936 ASSERT(get_itbl(target)->type == TSO);
2939 ASSERT(target->blocked_exceptions != NULL);
2941 last = &target->blocked_exceptions;
2942 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2943 last = &t->link, t = t->link) {
2944 ASSERT(get_itbl(t)->type == TSO);
2950 barf("unblockThread (Exception): TSO not found");
2954 case BlockedOnWrite:
2956 StgTSO *prev = NULL;
2957 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2958 prev = t, t = t->link) {
2961 blocked_queue_hd = t->link;
2962 if (blocked_queue_tl == t) {
2963 blocked_queue_tl = END_TSO_QUEUE;
2966 prev->link = t->link;
2967 if (blocked_queue_tl == t) {
2968 blocked_queue_tl = prev;
2974 barf("unblockThread (I/O): TSO not found");
2977 case BlockedOnDelay:
2979 StgTSO *prev = NULL;
2980 for (t = sleeping_queue; t != END_TSO_QUEUE;
2981 prev = t, t = t->link) {
2984 sleeping_queue = t->link;
2986 prev->link = t->link;
2991 barf("unblockThread (I/O): TSO not found");
2995 barf("unblockThread");
2999 tso->link = END_TSO_QUEUE;
3000 tso->why_blocked = NotBlocked;
3001 tso->block_info.closure = NULL;
3002 PUSH_ON_RUN_QUEUE(tso);
3003 RELEASE_LOCK(&sched_mutex);
3007 /* -----------------------------------------------------------------------------
3010 * The following function implements the magic for raising an
3011 * asynchronous exception in an existing thread.
3013 * We first remove the thread from any queue on which it might be
3014 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3016 * We strip the stack down to the innermost CATCH_FRAME, building
3017 * thunks in the heap for all the active computations, so they can
3018 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3019 * an application of the handler to the exception, and push it on
3020 * the top of the stack.
3022 * How exactly do we save all the active computations? We create an
3023 * AP_UPD for every UpdateFrame on the stack. Entering one of these
3024 * AP_UPDs pushes everything from the corresponding update frame
3025 * upwards onto the stack. (Actually, it pushes everything up to the
3026 * next update frame plus a pointer to the next AP_UPD object.
3027 * Entering the next AP_UPD object pushes more onto the stack until we
3028 * reach the last AP_UPD object - at which point the stack should look
3029 * exactly as it did when we killed the TSO and we can continue
3030 * execution by entering the closure on top of the stack.
3032 * We can also kill a thread entirely - this happens if either (a) the
3033 * exception passed to raiseAsync is NULL, or (b) there's no
3034 * CATCH_FRAME on the stack. In either case, we strip the entire
3035 * stack and replace the thread with a zombie.
3037 * -------------------------------------------------------------------------- */
3040 deleteThread(StgTSO *tso)
3042 raiseAsync(tso,NULL);
3046 raiseAsync(StgTSO *tso, StgClosure *exception)
3048 StgUpdateFrame* su = tso->su;
3049 StgPtr sp = tso->sp;
3051 /* Thread already dead? */
3052 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3056 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3058 /* Remove it from any blocking queues */
3061 /* The stack freezing code assumes there's a closure pointer on
3062 * the top of the stack. This isn't always the case with compiled
3063 * code, so we have to push a dummy closure on the top which just
3064 * returns to the next return address on the stack.
3066 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3067 *(--sp) = (W_)&stg_dummy_ret_closure;
3071 nat words = ((P_)su - (P_)sp) - 1;
3075 /* If we find a CATCH_FRAME, and we've got an exception to raise,
3076 * then build the THUNK raise(exception), and leave it on
3077 * top of the CATCH_FRAME ready to enter.
3079 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3080 StgCatchFrame *cf = (StgCatchFrame *)su;
3083 /* we've got an exception to raise, so let's pass it to the
3084 * handler in this frame.
3086 raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3087 TICK_ALLOC_SE_THK(1,0);
3088 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3089 raise->payload[0] = exception;
3091 /* throw away the stack from Sp up to the CATCH_FRAME.
3095 /* Ensure that async excpetions are blocked now, so we don't get
3096 * a surprise exception before we get around to executing the
3099 if (tso->blocked_exceptions == NULL) {
3100 tso->blocked_exceptions = END_TSO_QUEUE;
3103 /* Put the newly-built THUNK on top of the stack, ready to execute
3104 * when the thread restarts.
3109 tso->what_next = ThreadEnterGHC;
3110 IF_DEBUG(sanity, checkTSO(tso));
3114 /* First build an AP_UPD consisting of the stack chunk above the
3115 * current update frame, with the top word on the stack as the
3118 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3123 ap->fun = (StgClosure *)sp[0];
3125 for(i=0; i < (nat)words; ++i) {
3126 ap->payload[i] = (StgClosure *)*sp++;
3129 switch (get_itbl(su)->type) {
3133 SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */);
3134 TICK_ALLOC_UP_THK(words+1,0);
3137 fprintf(stderr, "scheduler: Updating ");
3138 printPtr((P_)su->updatee);
3139 fprintf(stderr, " with ");
3140 printObj((StgClosure *)ap);
3143 /* Replace the updatee with an indirection - happily
3144 * this will also wake up any threads currently
3145 * waiting on the result.
3147 * Warning: if we're in a loop, more than one update frame on
3148 * the stack may point to the same object. Be careful not to
3149 * overwrite an IND_OLDGEN in this case, because we'll screw
3150 * up the mutable lists. To be on the safe side, don't
3151 * overwrite any kind of indirection at all. See also
3152 * threadSqueezeStack in GC.c, where we have to make a similar
3155 if (!closure_IND(su->updatee)) {
3156 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
3159 sp += sizeofW(StgUpdateFrame) -1;
3160 sp[0] = (W_)ap; /* push onto stack */
3166 StgCatchFrame *cf = (StgCatchFrame *)su;
3169 /* We want a PAP, not an AP_UPD. Fortunately, the
3170 * layout's the same.
3172 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3173 TICK_ALLOC_UPD_PAP(words+1,0);
3175 /* now build o = FUN(catch,ap,handler) */
3176 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3177 TICK_ALLOC_FUN(2,0);
3178 SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3179 o->payload[0] = (StgClosure *)ap;
3180 o->payload[1] = cf->handler;
3183 fprintf(stderr, "scheduler: Built ");
3184 printObj((StgClosure *)o);
3187 /* pop the old handler and put o on the stack */
3189 sp += sizeofW(StgCatchFrame) - 1;
3196 StgSeqFrame *sf = (StgSeqFrame *)su;
3199 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3200 TICK_ALLOC_UPD_PAP(words+1,0);
3202 /* now build o = FUN(seq,ap) */
3203 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3204 TICK_ALLOC_SE_THK(1,0);
3205 SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3206 o->payload[0] = (StgClosure *)ap;
3209 fprintf(stderr, "scheduler: Built ");
3210 printObj((StgClosure *)o);
3213 /* pop the old handler and put o on the stack */
3215 sp += sizeofW(StgSeqFrame) - 1;
3221 /* We've stripped the entire stack, the thread is now dead. */
3222 sp += sizeofW(StgStopFrame) - 1;
3223 sp[0] = (W_)exception; /* save the exception */
3224 tso->what_next = ThreadKilled;
3225 tso->su = (StgUpdateFrame *)(sp+1);
3236 /* -----------------------------------------------------------------------------
3237 resurrectThreads is called after garbage collection on the list of
3238 threads found to be garbage. Each of these threads will be woken
3239 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3240 on an MVar, or NonTermination if the thread was blocked on a Black
3242 -------------------------------------------------------------------------- */
3245 resurrectThreads( StgTSO *threads )
3249 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3250 next = tso->global_link;
3251 tso->global_link = all_threads;
3253 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3255 switch (tso->why_blocked) {
3257 case BlockedOnException:
3258 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3260 case BlockedOnBlackHole:
3261 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3264 /* This might happen if the thread was blocked on a black hole
3265 * belonging to a thread that we've just woken up (raiseAsync
3266 * can wake up threads, remember...).
3270 barf("resurrectThreads: thread blocked in a strange way");
3275 /* -----------------------------------------------------------------------------
3276 * Blackhole detection: if we reach a deadlock, test whether any
3277 * threads are blocked on themselves. Any threads which are found to
3278 * be self-blocked get sent a NonTermination exception.
3280 * This is only done in a deadlock situation in order to avoid
3281 * performance overhead in the normal case.
3282 * -------------------------------------------------------------------------- */
3285 detectBlackHoles( void )
3287 StgTSO *t = all_threads;
3288 StgUpdateFrame *frame;
3289 StgClosure *blocked_on;
3291 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3293 while (t->what_next == ThreadRelocated) {
3295 ASSERT(get_itbl(t)->type == TSO);
3298 if (t->why_blocked != BlockedOnBlackHole) {
3302 blocked_on = t->block_info.closure;
3304 for (frame = t->su; ; frame = frame->link) {
3305 switch (get_itbl(frame)->type) {
3308 if (frame->updatee == blocked_on) {
3309 /* We are blocking on one of our own computations, so
3310 * send this thread the NonTermination exception.
3313 sched_belch("thread %d is blocked on itself", t->id));
3314 raiseAsync(t, (StgClosure *)NonTermination_closure);
3335 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3336 //@subsection Debugging Routines
3338 /* -----------------------------------------------------------------------------
3339 Debugging: why is a thread blocked
3340 -------------------------------------------------------------------------- */
3345 printThreadBlockage(StgTSO *tso)
3347 switch (tso->why_blocked) {
3349 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3351 case BlockedOnWrite:
3352 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3354 case BlockedOnDelay:
3355 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3358 fprintf(stderr,"is blocked on an MVar");
3360 case BlockedOnException:
3361 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3362 tso->block_info.tso->id);
3364 case BlockedOnBlackHole:
3365 fprintf(stderr,"is blocked on a black hole");
3368 fprintf(stderr,"is not blocked");
3372 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3373 tso->block_info.closure, info_type(tso->block_info.closure));
3375 case BlockedOnGA_NoSend:
3376 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3377 tso->block_info.closure, info_type(tso->block_info.closure));
3380 #if defined(RTS_SUPPORTS_THREADS)
3381 case BlockedOnCCall:
3382 fprintf(stderr,"is blocked on an external call");
3386 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3387 tso->why_blocked, tso->id, tso);
3392 printThreadStatus(StgTSO *tso)
3394 switch (tso->what_next) {
3396 fprintf(stderr,"has been killed");
3398 case ThreadComplete:
3399 fprintf(stderr,"has completed");
3402 printThreadBlockage(tso);
3407 printAllThreads(void)
3412 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3413 ullong_format_string(TIME_ON_PROC(CurrentProc),
3414 time_string, rtsFalse/*no commas!*/);
3416 sched_belch("all threads at [%s]:", time_string);
3418 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3419 ullong_format_string(CURRENT_TIME,
3420 time_string, rtsFalse/*no commas!*/);
3422 sched_belch("all threads at [%s]:", time_string);
3424 sched_belch("all threads:");
3427 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3428 fprintf(stderr, "\tthread %d ", t->id);
3429 printThreadStatus(t);
3430 fprintf(stderr,"\n");
3435 Print a whole blocking queue attached to node (debugging only).
3440 print_bq (StgClosure *node)
3442 StgBlockingQueueElement *bqe;
3446 fprintf(stderr,"## BQ of closure %p (%s): ",
3447 node, info_type(node));
3449 /* should cover all closures that may have a blocking queue */
3450 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3451 get_itbl(node)->type == FETCH_ME_BQ ||
3452 get_itbl(node)->type == RBH ||
3453 get_itbl(node)->type == MVAR);
3455 ASSERT(node!=(StgClosure*)NULL); // sanity check
3457 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3461 Print a whole blocking queue starting with the element bqe.
3464 print_bqe (StgBlockingQueueElement *bqe)
3469 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3471 for (end = (bqe==END_BQ_QUEUE);
3472 !end; // iterate until bqe points to a CONSTR
3473 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3474 bqe = end ? END_BQ_QUEUE : bqe->link) {
3475 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3476 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3477 /* types of closures that may appear in a blocking queue */
3478 ASSERT(get_itbl(bqe)->type == TSO ||
3479 get_itbl(bqe)->type == BLOCKED_FETCH ||
3480 get_itbl(bqe)->type == CONSTR);
3481 /* only BQs of an RBH end with an RBH_Save closure */
3482 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3484 switch (get_itbl(bqe)->type) {
3486 fprintf(stderr," TSO %u (%x),",
3487 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3490 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3491 ((StgBlockedFetch *)bqe)->node,
3492 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3493 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3494 ((StgBlockedFetch *)bqe)->ga.weight);
3497 fprintf(stderr," %s (IP %p),",
3498 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3499 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3500 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3501 "RBH_Save_?"), get_itbl(bqe));
3504 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3505 info_type((StgClosure *)bqe)); // , node, info_type(node));
3509 fputc('\n', stderr);
3511 # elif defined(GRAN)
3513 print_bq (StgClosure *node)
3515 StgBlockingQueueElement *bqe;
3516 PEs node_loc, tso_loc;
3519 /* should cover all closures that may have a blocking queue */
3520 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3521 get_itbl(node)->type == FETCH_ME_BQ ||
3522 get_itbl(node)->type == RBH);
3524 ASSERT(node!=(StgClosure*)NULL); // sanity check
3525 node_loc = where_is(node);
3527 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3528 node, info_type(node), node_loc);
3531 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3533 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3534 !end; // iterate until bqe points to a CONSTR
3535 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3536 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3537 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3538 /* types of closures that may appear in a blocking queue */
3539 ASSERT(get_itbl(bqe)->type == TSO ||
3540 get_itbl(bqe)->type == CONSTR);
3541 /* only BQs of an RBH end with an RBH_Save closure */
3542 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3544 tso_loc = where_is((StgClosure *)bqe);
3545 switch (get_itbl(bqe)->type) {
3547 fprintf(stderr," TSO %d (%p) on [PE %d],",
3548 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3551 fprintf(stderr," %s (IP %p),",
3552 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3553 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3554 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3555 "RBH_Save_?"), get_itbl(bqe));
3558 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3559 info_type((StgClosure *)bqe), node, info_type(node));
3563 fputc('\n', stderr);
3567 Nice and easy: only TSOs on the blocking queue
3570 print_bq (StgClosure *node)
3574 ASSERT(node!=(StgClosure*)NULL); // sanity check
3575 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3576 tso != END_TSO_QUEUE;
3578 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3579 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3580 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3582 fputc('\n', stderr);
3593 for (i=0, tso=run_queue_hd;
3594 tso != END_TSO_QUEUE;
3603 sched_belch(char *s, ...)
3608 fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3610 fprintf(stderr, "== ");
3612 fprintf(stderr, "scheduler: ");
3614 vfprintf(stderr, s, ap);
3615 fprintf(stderr, "\n");
3621 //@node Index, , Debugging Routines, Main scheduling code
3625 //* StgMainThread:: @cindex\s-+StgMainThread
3626 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3627 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3628 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3629 //* context_switch:: @cindex\s-+context_switch
3630 //* createThread:: @cindex\s-+createThread
3631 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3632 //* initScheduler:: @cindex\s-+initScheduler
3633 //* interrupted:: @cindex\s-+interrupted
3634 //* next_thread_id:: @cindex\s-+next_thread_id
3635 //* print_bq:: @cindex\s-+print_bq
3636 //* run_queue_hd:: @cindex\s-+run_queue_hd
3637 //* run_queue_tl:: @cindex\s-+run_queue_tl
3638 //* sched_mutex:: @cindex\s-+sched_mutex
3639 //* schedule:: @cindex\s-+schedule
3640 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3641 //* term_mutex:: @cindex\s-+term_mutex