1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.124 2002/02/15 07:50:36 sof Exp $
4 * (c) The GHC Team, 1998-2000
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distributed memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
18 * --------------------------------------------------------------------------*/
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
24 * Version with scheduler monitor support for SMPs (WAY=s):
26 This design provides a high-level API to create and schedule threads etc.
27 as documented in the SMP design document.
29 It uses a monitor design controlled by a single mutex to exercise control
30 over accesses to shared data structures, and builds on the Posix threads
33 The majority of state is shared. In order to keep essential per-task state,
34 there is a Capability structure, which contains all the information
35 needed to run a thread: its STG registers, a pointer to its TSO, a
36 nursery etc. During STG execution, a pointer to the capability is
37 kept in a register (BaseReg).
39 In a non-SMP build, there is one global capability, namely MainRegTable.
43 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
45 The main scheduling loop in GUM iterates until a finish message is received.
46 In that case a global flag @receivedFinish@ is set and this instance of
47 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48 for the handling of incoming messages, such as PP_FINISH.
49 Note that in the parallel case we have a system manager that coordinates
50 different PEs, each of which are running one instance of the RTS.
51 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
54 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
56 The main scheduling code in GranSim is quite different from that in std
57 (concurrent) Haskell: while concurrent Haskell just iterates over the
58 threads in the runnable queue, GranSim is event driven, i.e. it iterates
59 over the events in the global event queue. -- HWL
64 //* Variables and Data structures::
65 //* Main scheduling loop::
66 //* Suspend and Resume::
68 //* Garbage Collextion Routines::
69 //* Blocking Queue Routines::
70 //* Exception Handling Routines::
71 //* Debugging Routines::
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
78 #include "PosixSource.h"
85 #include "StgStartup.h"
88 #include "StgMiscClosures.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
124 * These are the threads which clients have requested that we run.
126 * In a 'threaded' build, we might have several concurrent clients all
127 * waiting for results, and each one will wait on a condition variable
128 * until the result is available.
130 * In non-SMP, clients are strictly nested: the first client calls
131 * into the RTS, which might call out again to C with a _ccall_GC, and
132 * eventually re-enter the RTS.
134 * Main threads information is kept in a linked list:
136 //@cindex StgMainThread
137 typedef struct StgMainThread_ {
139 SchedulerStatus stat;
141 #if defined(RTS_SUPPORTS_THREADS)
144 struct StgMainThread_ *link;
147 /* Main thread queue.
148 * Locks required: sched_mutex.
150 static StgMainThread *main_threads;
153 * Locks required: sched_mutex.
157 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
161 In GranSim we have a runnable and a blocked queue for each processor.
162 In order to minimise code changes new arrays run_queue_hds/tls
163 are created. run_queue_hd is then a short cut (macro) for
164 run_queue_hds[CurrentProc] (see GranSim.h).
167 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
168 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
169 StgTSO *ccalling_threadss[MAX_PROC];
170 /* We use the same global list of threads (all_threads) in GranSim as in
171 the std RTS (i.e. we are cheating). However, we don't use this list in
172 the GranSim specific code at the moment (so we are only potentially
177 StgTSO *run_queue_hd, *run_queue_tl;
178 StgTSO *blocked_queue_hd, *blocked_queue_tl;
179 StgTSO *sleeping_queue; /* perhaps replace with a hash table? */
183 /* Linked list of all threads.
184 * Used for detecting garbage collected threads.
188 /* When a thread performs a safe C call (_ccall_GC, using old
189 * terminology), it gets put on the suspended_ccalling_threads
190 * list. Used by the garbage collector.
192 static StgTSO *suspended_ccalling_threads;
194 static StgTSO *threadStackOverflow(StgTSO *tso);
196 /* KH: The following two flags are shared memory locations. There is no need
197 to lock them, since they are only unset at the end of a scheduler
201 /* flag set by signal handler to precipitate a context switch */
202 //@cindex context_switch
205 /* if this flag is set as well, give up execution */
206 //@cindex interrupted
209 /* Next thread ID to allocate.
210 * Locks required: sched_mutex
212 //@cindex next_thread_id
213 StgThreadID next_thread_id = 1;
216 * Pointers to the state of the current thread.
217 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
218 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
221 /* The smallest stack size that makes any sense is:
222 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
223 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
224 * + 1 (the realworld token for an IO thread)
225 * + 1 (the closure to enter)
227 * A thread with this stack will bomb immediately with a stack
228 * overflow, which will increase its stack size.
231 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
238 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
239 * exists - earlier gccs apparently didn't.
246 void addToBlockedQueue ( StgTSO *tso );
248 static void schedule ( void );
249 void interruptStgRts ( void );
251 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
253 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
256 static void detectBlackHoles ( void );
259 static void sched_belch(char *s, ...);
262 #if defined(RTS_SUPPORTS_THREADS)
263 /* ToDo: carefully document the invariants that go together
264 * with these synchronisation objects.
266 Mutex sched_mutex = INIT_MUTEX_VAR;
267 Mutex term_mutex = INIT_MUTEX_VAR;
270 static Condition gc_pending_cond = INIT_COND_VAR;
274 #endif /* RTS_SUPPORTS_THREADS */
278 rtsTime TimeOfLastYield;
279 rtsBool emitSchedule = rtsTrue;
283 char *whatNext_strs[] = {
291 char *threadReturnCode_strs[] = {
292 "HeapOverflow", /* might also be StackOverflow */
301 StgTSO * createSparkThread(rtsSpark spark);
302 StgTSO * activateSpark (rtsSpark spark);
306 * The thread state for the main thread.
307 // ToDo: check whether not needed any more
311 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
312 static void taskStart(void);
323 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
324 //@subsection Main scheduling loop
326 /* ---------------------------------------------------------------------------
327 Main scheduling loop.
329 We use round-robin scheduling, each thread returning to the
330 scheduler loop when one of these conditions is detected:
333 * timer expires (thread yields)
338 Locking notes: we acquire the scheduler lock once at the beginning
339 of the scheduler loop, and release it when
341 * running a thread, or
342 * waiting for work, or
343 * waiting for a GC to complete.
346 In a GranSim setup this loop iterates over the global event queue.
347 This revolves around the global event queue, which determines what
348 to do next. Therefore, it's more complicated than either the
349 concurrent or the parallel (GUM) setup.
352 GUM iterates over incoming messages.
353 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
354 and sends out a fish whenever it has nothing to do; in-between
355 doing the actual reductions (shared code below) it processes the
356 incoming messages and deals with delayed operations
357 (see PendingFetches).
358 This is not the ugliest code you could imagine, but it's bloody close.
360 ------------------------------------------------------------------------ */
367 StgThreadReturnCode ret;
375 rtsBool receivedFinish = rtsFalse;
377 nat tp_size, sp_size; // stats only
380 rtsBool was_interrupted = rtsFalse;
382 ACQUIRE_LOCK(&sched_mutex);
384 #if defined(RTS_SUPPORTS_THREADS)
385 /* Check to see whether there are any worker threads
386 waiting to deposit external call results. If so,
387 yield our capability */
388 yieldToReturningWorker(&sched_mutex, cap);
390 waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
394 /* set up first event to get things going */
395 /* ToDo: assign costs for system setup and init MainTSO ! */
396 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
398 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
401 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
402 G_TSO(CurrentTSO, 5));
404 if (RtsFlags.GranFlags.Light) {
405 /* Save current time; GranSim Light only */
406 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
409 event = get_next_event();
411 while (event!=(rtsEvent*)NULL) {
412 /* Choose the processor with the next event */
413 CurrentProc = event->proc;
414 CurrentTSO = event->tso;
418 while (!receivedFinish) { /* set by processMessages */
419 /* when receiving PP_FINISH message */
426 IF_DEBUG(scheduler, printAllThreads());
428 /* If we're interrupted (the user pressed ^C, or some other
429 * termination condition occurred), kill all the currently running
433 IF_DEBUG(scheduler, sched_belch("interrupted"));
435 interrupted = rtsFalse;
436 was_interrupted = rtsTrue;
439 /* Go through the list of main threads and wake up any
440 * clients whose computations have finished. ToDo: this
441 * should be done more efficiently without a linear scan
442 * of the main threads list, somehow...
444 #if defined(RTS_SUPPORTS_THREADS)
446 StgMainThread *m, **prev;
447 prev = &main_threads;
448 for (m = main_threads; m != NULL; m = m->link) {
449 switch (m->tso->what_next) {
452 *(m->ret) = (StgClosure *)m->tso->sp[0];
456 broadcastCondition(&m->wakeup);
459 if (m->ret) *(m->ret) = NULL;
461 if (was_interrupted) {
462 m->stat = Interrupted;
466 broadcastCondition(&m->wakeup);
474 #else /* not threaded */
477 /* in GUM do this only on the Main PE */
480 /* If our main thread has finished or been killed, return.
483 StgMainThread *m = main_threads;
484 if (m->tso->what_next == ThreadComplete
485 || m->tso->what_next == ThreadKilled) {
486 main_threads = main_threads->link;
487 if (m->tso->what_next == ThreadComplete) {
488 /* we finished successfully, fill in the return value */
489 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
493 if (m->ret) { *(m->ret) = NULL; };
494 if (was_interrupted) {
495 m->stat = Interrupted;
505 /* Top up the run queue from our spark pool. We try to make the
506 * number of threads in the run queue equal to the number of
509 * Disable spark support in SMP for now, non-essential & requires
510 * a little bit of work to make it compile cleanly. -- sof 1/02.
512 #if 0 /* defined(SMP) */
514 nat n = getFreeCapabilities();
515 StgTSO *tso = run_queue_hd;
517 /* Count the run queue */
518 while (n > 0 && tso != END_TSO_QUEUE) {
525 spark = findSpark(rtsFalse);
527 break; /* no more sparks in the pool */
529 /* I'd prefer this to be done in activateSpark -- HWL */
530 /* tricky - it needs to hold the scheduler lock and
531 * not try to re-acquire it -- SDM */
532 createSparkThread(spark);
534 sched_belch("==^^ turning spark of closure %p into a thread",
535 (StgClosure *)spark));
538 /* We need to wake up the other tasks if we just created some
541 if (getFreeCapabilities() - n > 1) {
542 signalCondition( &thread_ready_cond );
547 /* check for signals each time around the scheduler */
548 #ifndef mingw32_TARGET_OS
549 if (signals_pending()) {
550 startSignalHandlers();
554 /* Check whether any waiting threads need to be woken up. If the
555 * run queue is empty, and there are no other tasks running, we
556 * can wait indefinitely for something to happen.
557 * ToDo: what if another client comes along & requests another
560 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
561 awaitEvent( EMPTY_RUN_QUEUE()
563 && allFreeCapabilities()
567 /* we can be interrupted while waiting for I/O... */
568 if (interrupted) continue;
571 * Detect deadlock: when we have no threads to run, there are no
572 * threads waiting on I/O or sleeping, and all the other tasks are
573 * waiting for work, we must have a deadlock of some description.
575 * We first try to find threads blocked on themselves (ie. black
576 * holes), and generate NonTermination exceptions where necessary.
578 * If no threads are black holed, we have a deadlock situation, so
579 * inform all the main threads.
582 if ( EMPTY_RUN_QUEUE()
583 && EMPTY_QUEUE(blocked_queue_hd)
584 && EMPTY_QUEUE(sleeping_queue)
585 #if defined(RTS_SUPPORTS_THREADS)
586 && EMPTY_QUEUE(suspended_ccalling_threads)
589 && allFreeCapabilities()
593 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
594 #if defined(THREADED_RTS)
595 /* and SMP mode ..? */
596 releaseCapability(cap);
598 RELEASE_LOCK(&sched_mutex);
599 GarbageCollect(GetRoots,rtsTrue);
600 ACQUIRE_LOCK(&sched_mutex);
601 if ( EMPTY_QUEUE(blocked_queue_hd)
603 && EMPTY_QUEUE(sleeping_queue) ) {
605 IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
608 /* No black holes, so probably a real deadlock. Send the
609 * current main thread the Deadlock exception (or in the SMP
610 * build, send *all* main threads the deadlock exception,
611 * since none of them can make progress).
613 if ( EMPTY_RUN_QUEUE() ) {
615 #if defined(RTS_SUPPORTS_THREADS)
616 for (m = main_threads; m != NULL; m = m->link) {
617 switch (m->tso->why_blocked) {
618 case BlockedOnBlackHole:
619 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
621 case BlockedOnException:
623 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
626 barf("deadlock: main thread blocked in a strange way");
631 switch (m->tso->why_blocked) {
632 case BlockedOnBlackHole:
633 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
635 case BlockedOnException:
637 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
640 barf("deadlock: main thread blocked in a strange way");
644 #if defined(RTS_SUPPORTS_THREADS)
645 /* ToDo: revisit conditions (and mechanism) for shutting
646 down a multi-threaded world */
647 if ( EMPTY_RUN_QUEUE() ) {
648 IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
649 shutdownHaskellAndExit(0);
652 ASSERT( !EMPTY_RUN_QUEUE() );
656 /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
660 /* If there's a GC pending, don't do anything until it has
664 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
665 waitCondition( &gc_pending_cond, &sched_mutex );
669 #if defined(RTS_SUPPORTS_THREADS)
670 /* block until we've got a thread on the run queue and a free
674 if ( EMPTY_RUN_QUEUE() ) {
675 /* Give up our capability */
676 releaseCapability(cap);
677 IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
678 waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
679 IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
681 while ( EMPTY_RUN_QUEUE() ) {
682 waitForWorkCapability(&sched_mutex, &cap);
683 IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
690 if (RtsFlags.GranFlags.Light)
691 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
693 /* adjust time based on time-stamp */
694 if (event->time > CurrentTime[CurrentProc] &&
695 event->evttype != ContinueThread)
696 CurrentTime[CurrentProc] = event->time;
698 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
699 if (!RtsFlags.GranFlags.Light)
702 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
704 /* main event dispatcher in GranSim */
705 switch (event->evttype) {
706 /* Should just be continuing execution */
708 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
709 /* ToDo: check assertion
710 ASSERT(run_queue_hd != (StgTSO*)NULL &&
711 run_queue_hd != END_TSO_QUEUE);
713 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
714 if (!RtsFlags.GranFlags.DoAsyncFetch &&
715 procStatus[CurrentProc]==Fetching) {
716 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
717 CurrentTSO->id, CurrentTSO, CurrentProc);
720 /* Ignore ContinueThreads for completed threads */
721 if (CurrentTSO->what_next == ThreadComplete) {
722 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
723 CurrentTSO->id, CurrentTSO, CurrentProc);
726 /* Ignore ContinueThreads for threads that are being migrated */
727 if (PROCS(CurrentTSO)==Nowhere) {
728 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
729 CurrentTSO->id, CurrentTSO, CurrentProc);
732 /* The thread should be at the beginning of the run queue */
733 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
734 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
735 CurrentTSO->id, CurrentTSO, CurrentProc);
736 break; // run the thread anyway
739 new_event(proc, proc, CurrentTime[proc],
741 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
743 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
744 break; // now actually run the thread; DaH Qu'vam yImuHbej
747 do_the_fetchnode(event);
748 goto next_thread; /* handle next event in event queue */
751 do_the_globalblock(event);
752 goto next_thread; /* handle next event in event queue */
755 do_the_fetchreply(event);
756 goto next_thread; /* handle next event in event queue */
758 case UnblockThread: /* Move from the blocked queue to the tail of */
759 do_the_unblock(event);
760 goto next_thread; /* handle next event in event queue */
762 case ResumeThread: /* Move from the blocked queue to the tail of */
763 /* the runnable queue ( i.e. Qu' SImqa'lu') */
764 event->tso->gran.blocktime +=
765 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
766 do_the_startthread(event);
767 goto next_thread; /* handle next event in event queue */
770 do_the_startthread(event);
771 goto next_thread; /* handle next event in event queue */
774 do_the_movethread(event);
775 goto next_thread; /* handle next event in event queue */
778 do_the_movespark(event);
779 goto next_thread; /* handle next event in event queue */
782 do_the_findwork(event);
783 goto next_thread; /* handle next event in event queue */
786 barf("Illegal event type %u\n", event->evttype);
789 /* This point was scheduler_loop in the old RTS */
791 IF_DEBUG(gran, belch("GRAN: after main switch"));
793 TimeOfLastEvent = CurrentTime[CurrentProc];
794 TimeOfNextEvent = get_time_of_next_event();
795 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
796 // CurrentTSO = ThreadQueueHd;
798 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
801 if (RtsFlags.GranFlags.Light)
802 GranSimLight_leave_system(event, &ActiveTSO);
804 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
807 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
809 /* in a GranSim setup the TSO stays on the run queue */
811 /* Take a thread from the run queue. */
812 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
815 fprintf(stderr, "GRAN: About to run current thread, which is\n");
818 context_switch = 0; // turned on via GranYield, checking events and time slice
821 DumpGranEvent(GR_SCHEDULE, t));
823 procStatus[CurrentProc] = Busy;
826 if (PendingFetches != END_BF_QUEUE) {
830 /* ToDo: phps merge with spark activation above */
831 /* check whether we have local work and send requests if we have none */
832 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
833 /* :-[ no local threads => look out for local sparks */
834 /* the spark pool for the current PE */
835 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
836 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
837 pool->hd < pool->tl) {
839 * ToDo: add GC code check that we really have enough heap afterwards!!
841 * If we're here (no runnable threads) and we have pending
842 * sparks, we must have a space problem. Get enough space
843 * to turn one of those pending sparks into a
847 spark = findSpark(rtsFalse); /* get a spark */
848 if (spark != (rtsSpark) NULL) {
849 tso = activateSpark(spark); /* turn the spark into a thread */
850 IF_PAR_DEBUG(schedule,
851 belch("==== schedule: Created TSO %d (%p); %d threads active",
852 tso->id, tso, advisory_thread_count));
854 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
855 belch("==^^ failed to activate spark");
857 } /* otherwise fall through & pick-up new tso */
859 IF_PAR_DEBUG(verbose,
860 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
861 spark_queue_len(pool)));
866 /* If we still have no work we need to send a FISH to get a spark
869 if (EMPTY_RUN_QUEUE()) {
870 /* =8-[ no local sparks => look for work on other PEs */
872 * We really have absolutely no work. Send out a fish
873 * (there may be some out there already), and wait for
874 * something to arrive. We clearly can't run any threads
875 * until a SCHEDULE or RESUME arrives, and so that's what
876 * we're hoping to see. (Of course, we still have to
877 * respond to other types of messages.)
879 TIME now = msTime() /*CURRENT_TIME*/;
880 IF_PAR_DEBUG(verbose,
881 belch("-- now=%ld", now));
882 IF_PAR_DEBUG(verbose,
883 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
884 (last_fish_arrived_at!=0 &&
885 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
886 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
887 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
888 last_fish_arrived_at,
889 RtsFlags.ParFlags.fishDelay, now);
892 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
893 (last_fish_arrived_at==0 ||
894 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
895 /* outstandingFishes is set in sendFish, processFish;
896 avoid flooding system with fishes via delay */
898 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
901 // Global statistics: count no. of fishes
902 if (RtsFlags.ParFlags.ParStats.Global &&
903 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
904 globalParStats.tot_fish_mess++;
908 receivedFinish = processMessages();
911 } else if (PacketsWaiting()) { /* Look for incoming messages */
912 receivedFinish = processMessages();
915 /* Now we are sure that we have some work available */
916 ASSERT(run_queue_hd != END_TSO_QUEUE);
918 /* Take a thread from the run queue, if we have work */
919 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
920 IF_DEBUG(sanity,checkTSO(t));
922 /* ToDo: write something to the log-file
923 if (RTSflags.ParFlags.granSimStats && !sameThread)
924 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
928 /* the spark pool for the current PE */
929 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
932 belch("--=^ %d threads, %d sparks on [%#x]",
933 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
936 if (0 && RtsFlags.ParFlags.ParStats.Full &&
937 t && LastTSO && t->id != LastTSO->id &&
938 LastTSO->why_blocked == NotBlocked &&
939 LastTSO->what_next != ThreadComplete) {
940 // if previously scheduled TSO not blocked we have to record the context switch
941 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
942 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
945 if (RtsFlags.ParFlags.ParStats.Full &&
946 (emitSchedule /* forced emit */ ||
947 (t && LastTSO && t->id != LastTSO->id))) {
949 we are running a different TSO, so write a schedule event to log file
950 NB: If we use fair scheduling we also have to write a deschedule
951 event for LastTSO; with unfair scheduling we know that the
952 previous tso has blocked whenever we switch to another tso, so
953 we don't need it in GUM for now
955 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
956 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
957 emitSchedule = rtsFalse;
961 #else /* !GRAN && !PAR */
963 /* grab a thread from the run queue */
964 ASSERT(run_queue_hd != END_TSO_QUEUE);
966 // Sanity check the thread we're about to run. This can be
967 // expensive if there is lots of thread switching going on...
968 IF_DEBUG(sanity,checkTSO(t));
971 grabCapability(&cap);
972 cap->r.rCurrentTSO = t;
974 /* context switches are now initiated by the timer signal, unless
975 * the user specified "context switch as often as possible", with
980 RtsFlags.ProfFlags.profileInterval == 0 ||
982 (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
983 && (run_queue_hd != END_TSO_QUEUE
984 || blocked_queue_hd != END_TSO_QUEUE
985 || sleeping_queue != END_TSO_QUEUE)))
990 RELEASE_LOCK(&sched_mutex);
992 IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
993 t->id, t, whatNext_strs[t->what_next]));
996 startHeapProfTimer();
999 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1000 /* Run the current thread
1002 switch (cap->r.rCurrentTSO->what_next) {
1004 case ThreadComplete:
1005 /* Thread already finished, return to scheduler. */
1006 ret = ThreadFinished;
1008 case ThreadEnterGHC:
1009 ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1012 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1014 case ThreadEnterInterp:
1015 ret = interpretBCO(cap);
1018 barf("schedule: invalid what_next field");
1020 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1022 /* Costs for the scheduler are assigned to CCS_SYSTEM */
1024 stopHeapProfTimer();
1028 ACQUIRE_LOCK(&sched_mutex);
1031 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1032 #elif !defined(GRAN) && !defined(PAR)
1033 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1035 t = cap->r.rCurrentTSO;
1038 /* HACK 675: if the last thread didn't yield, make sure to print a
1039 SCHEDULE event to the log file when StgRunning the next thread, even
1040 if it is the same one as before */
1042 TimeOfLastYield = CURRENT_TIME;
1048 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1049 globalGranStats.tot_heapover++;
1051 globalParStats.tot_heapover++;
1054 // did the task ask for a large block?
1055 if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1056 // if so, get one and push it on the front of the nursery.
1060 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1062 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)",
1064 whatNext_strs[t->what_next], blocks));
1066 // don't do this if it would push us over the
1067 // alloc_blocks_lim limit; we'll GC first.
1068 if (alloc_blocks + blocks < alloc_blocks_lim) {
1070 alloc_blocks += blocks;
1071 bd = allocGroup( blocks );
1073 // link the new group into the list
1074 bd->link = cap->r.rCurrentNursery;
1075 bd->u.back = cap->r.rCurrentNursery->u.back;
1076 if (cap->r.rCurrentNursery->u.back != NULL) {
1077 cap->r.rCurrentNursery->u.back->link = bd;
1079 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1080 g0s0->blocks == cap->r.rNursery);
1081 cap->r.rNursery = g0s0->blocks = bd;
1083 cap->r.rCurrentNursery->u.back = bd;
1085 // initialise it as a nursery block
1089 bd->free = bd->start;
1091 // don't forget to update the block count in g0s0.
1092 g0s0->n_blocks += blocks;
1093 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1095 // now update the nursery to point to the new block
1096 cap->r.rCurrentNursery = bd;
1098 // we might be unlucky and have another thread get on the
1099 // run queue before us and steal the large block, but in that
1100 // case the thread will just end up requesting another large
1102 PUSH_ON_RUN_QUEUE(t);
1107 /* make all the running tasks block on a condition variable,
1108 * maybe set context_switch and wait till they all pile in,
1109 * then have them wait on a GC condition variable.
1111 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
1112 t->id, t, whatNext_strs[t->what_next]));
1115 ASSERT(!is_on_queue(t,CurrentProc));
1117 /* Currently we emit a DESCHEDULE event before GC in GUM.
1118 ToDo: either add separate event to distinguish SYSTEM time from rest
1119 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1120 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1121 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1122 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1123 emitSchedule = rtsTrue;
1127 ready_to_gc = rtsTrue;
1128 context_switch = 1; /* stop other threads ASAP */
1129 PUSH_ON_RUN_QUEUE(t);
1130 /* actual GC is done at the end of the while loop */
1136 DumpGranEvent(GR_DESCHEDULE, t));
1137 globalGranStats.tot_stackover++;
1140 // DumpGranEvent(GR_DESCHEDULE, t);
1141 globalParStats.tot_stackover++;
1143 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
1144 t->id, t, whatNext_strs[t->what_next]));
1145 /* just adjust the stack for this thread, then pop it back
1151 /* enlarge the stack */
1152 StgTSO *new_t = threadStackOverflow(t);
1154 /* This TSO has moved, so update any pointers to it from the
1155 * main thread stack. It better not be on any other queues...
1156 * (it shouldn't be).
1158 for (m = main_threads; m != NULL; m = m->link) {
1163 threadPaused(new_t);
1164 PUSH_ON_RUN_QUEUE(new_t);
1168 case ThreadYielding:
1171 DumpGranEvent(GR_DESCHEDULE, t));
1172 globalGranStats.tot_yields++;
1175 // DumpGranEvent(GR_DESCHEDULE, t);
1176 globalParStats.tot_yields++;
1178 /* put the thread back on the run queue. Then, if we're ready to
1179 * GC, check whether this is the last task to stop. If so, wake
1180 * up the GC thread. getThread will block during a GC until the
1184 if (t->what_next == ThreadEnterInterp) {
1185 /* ToDo: or maybe a timer expired when we were in Hugs?
1186 * or maybe someone hit ctrl-C
1188 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
1189 t->id, t, whatNext_strs[t->what_next]);
1191 belch("--<< thread %ld (%p; %s) stopped, yielding",
1192 t->id, t, whatNext_strs[t->what_next]);
1199 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1201 ASSERT(t->link == END_TSO_QUEUE);
1203 ASSERT(!is_on_queue(t,CurrentProc));
1206 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1207 checkThreadQsSanity(rtsTrue));
1210 if (RtsFlags.ParFlags.doFairScheduling) {
1211 /* this does round-robin scheduling; good for concurrency */
1212 APPEND_TO_RUN_QUEUE(t);
1214 /* this does unfair scheduling; good for parallelism */
1215 PUSH_ON_RUN_QUEUE(t);
1218 /* this does round-robin scheduling; good for concurrency */
1219 APPEND_TO_RUN_QUEUE(t);
1222 /* add a ContinueThread event to actually process the thread */
1223 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1225 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1227 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1236 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1237 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1238 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1240 // ??? needed; should emit block before
1242 DumpGranEvent(GR_DESCHEDULE, t));
1243 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1246 ASSERT(procStatus[CurrentProc]==Busy ||
1247 ((procStatus[CurrentProc]==Fetching) &&
1248 (t->block_info.closure!=(StgClosure*)NULL)));
1249 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1250 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1251 procStatus[CurrentProc]==Fetching))
1252 procStatus[CurrentProc] = Idle;
1256 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1257 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1260 if (t->block_info.closure!=(StgClosure*)NULL)
1261 print_bq(t->block_info.closure));
1263 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1266 /* whatever we schedule next, we must log that schedule */
1267 emitSchedule = rtsTrue;
1270 /* don't need to do anything. Either the thread is blocked on
1271 * I/O, in which case we'll have called addToBlockedQueue
1272 * previously, or it's blocked on an MVar or Blackhole, in which
1273 * case it'll be on the relevant queue already.
1276 fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1277 printThreadBlockage(t);
1278 fprintf(stderr, "\n"));
1280 /* Only for dumping event to log file
1281 ToDo: do I need this in GranSim, too?
1288 case ThreadFinished:
1289 /* Need to check whether this was a main thread, and if so, signal
1290 * the task that started it with the return value. If we have no
1291 * more main threads, we probably need to stop all the tasks until
1294 /* We also end up here if the thread kills itself with an
1295 * uncaught exception, see Exception.hc.
1297 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1299 endThread(t, CurrentProc); // clean-up the thread
1301 /* For now all are advisory -- HWL */
1302 //if(t->priority==AdvisoryPriority) ??
1303 advisory_thread_count--;
1306 if(t->dist.priority==RevalPriority)
1310 if (RtsFlags.ParFlags.ParStats.Full &&
1311 !RtsFlags.ParFlags.ParStats.Suppressed)
1312 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1317 barf("schedule: invalid thread return code %d", (int)ret);
1320 #if defined(RTS_SUPPORTS_THREADS)
1321 /* I don't understand what this re-grab is doing -- sof */
1322 grabCapability(&cap);
1326 if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1327 GarbageCollect(GetRoots, rtsTrue);
1329 performHeapProfile = rtsFalse;
1330 ready_to_gc = rtsFalse; // we already GC'd
1336 && allFreeCapabilities()
1339 /* everybody back, start the GC.
1340 * Could do it in this thread, or signal a condition var
1341 * to do it in another thread. Either way, we need to
1342 * broadcast on gc_pending_cond afterward.
1344 #if defined(RTS_SUPPORTS_THREADS)
1345 IF_DEBUG(scheduler,sched_belch("doing GC"));
1347 GarbageCollect(GetRoots,rtsFalse);
1348 ready_to_gc = rtsFalse;
1350 broadcastCondition(&gc_pending_cond);
1353 /* add a ContinueThread event to continue execution of current thread */
1354 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1356 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1358 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1366 IF_GRAN_DEBUG(unused,
1367 print_eventq(EventHd));
1369 event = get_next_event();
1372 /* ToDo: wait for next message to arrive rather than busy wait */
1375 } /* end of while(1) */
1377 IF_PAR_DEBUG(verbose,
1378 belch("== Leaving schedule() after having received Finish"));
1381 /* ---------------------------------------------------------------------------
1382 * deleteAllThreads(): kill all the live threads.
1384 * This is used when we catch a user interrupt (^C), before performing
1385 * any necessary cleanups and running finalizers.
1386 * ------------------------------------------------------------------------- */
1388 void deleteAllThreads ( void )
1391 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1392 for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) {
1396 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) {
1400 for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) {
1404 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1405 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1406 sleeping_queue = END_TSO_QUEUE;
1409 /* startThread and insertThread are now in GranSim.c -- HWL */
1412 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1413 //@subsection Suspend and Resume
1415 /* ---------------------------------------------------------------------------
1416 * Suspending & resuming Haskell threads.
1418 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1419 * its capability before calling the C function. This allows another
1420 * task to pick up the capability and carry on running Haskell
1421 * threads. It also means that if the C call blocks, it won't lock
1424 * The Haskell thread making the C call is put to sleep for the
1425 * duration of the call, on the susepended_ccalling_threads queue. We
1426 * give out a token to the task, which it can use to resume the thread
1427 * on return from the C function.
1428 * ------------------------------------------------------------------------- */
1431 suspendThread( StgRegTable *reg )
1436 /* assume that *reg is a pointer to the StgRegTable part
1439 cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1441 ACQUIRE_LOCK(&sched_mutex);
1444 sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1446 threadPaused(cap->r.rCurrentTSO);
1447 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1448 suspended_ccalling_threads = cap->r.rCurrentTSO;
1450 #if defined(RTS_SUPPORTS_THREADS)
1451 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1454 /* Use the thread ID as the token; it should be unique */
1455 tok = cap->r.rCurrentTSO->id;
1457 /* Hand back capability */
1458 releaseCapability(cap);
1460 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
1461 /* Preparing to leave the RTS, so ensure there's a native thread/task
1462 waiting to take over.
1464 ToDo: optimise this and only create a new task if there's a need
1465 for one (i.e., if there's only one Concurrent Haskell thread alive,
1466 there's no need to create a new task).
1468 IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1469 startTask(taskStart);
1473 RELEASE_LOCK(&sched_mutex);
1478 resumeThread( StgInt tok )
1480 StgTSO *tso, **prev;
1483 #if defined(RTS_SUPPORTS_THREADS)
1484 /* Wait for permission to re-enter the RTS with the result. */
1485 grabReturnCapability(&sched_mutex, &cap);
1487 grabCapability(&cap);
1490 /* Remove the thread off of the suspended list */
1491 prev = &suspended_ccalling_threads;
1492 for (tso = suspended_ccalling_threads;
1493 tso != END_TSO_QUEUE;
1494 prev = &tso->link, tso = tso->link) {
1495 if (tso->id == (StgThreadID)tok) {
1500 if (tso == END_TSO_QUEUE) {
1501 barf("resumeThread: thread not found");
1503 tso->link = END_TSO_QUEUE;
1504 /* Reset blocking status */
1505 tso->why_blocked = NotBlocked;
1507 RELEASE_LOCK(&sched_mutex);
1509 cap->r.rCurrentTSO = tso;
1514 /* ---------------------------------------------------------------------------
1516 * ------------------------------------------------------------------------ */
1517 static void unblockThread(StgTSO *tso);
1519 /* ---------------------------------------------------------------------------
1520 * Comparing Thread ids.
1522 * This is used from STG land in the implementation of the
1523 * instances of Eq/Ord for ThreadIds.
1524 * ------------------------------------------------------------------------ */
1526 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1528 StgThreadID id1 = tso1->id;
1529 StgThreadID id2 = tso2->id;
1531 if (id1 < id2) return (-1);
1532 if (id1 > id2) return 1;
1536 /* ---------------------------------------------------------------------------
1537 * Fetching the ThreadID from an StgTSO.
1539 * This is used in the implementation of Show for ThreadIds.
1540 * ------------------------------------------------------------------------ */
1541 int rts_getThreadId(const StgTSO *tso)
1546 /* ---------------------------------------------------------------------------
1547 Create a new thread.
1549 The new thread starts with the given stack size. Before the
1550 scheduler can run, however, this thread needs to have a closure
1551 (and possibly some arguments) pushed on its stack. See
1552 pushClosure() in Schedule.h.
1554 createGenThread() and createIOThread() (in SchedAPI.h) are
1555 convenient packaged versions of this function.
1557 currently pri (priority) is only used in a GRAN setup -- HWL
1558 ------------------------------------------------------------------------ */
1559 //@cindex createThread
1561 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1563 createThread(nat stack_size, StgInt pri)
1565 return createThread_(stack_size, rtsFalse, pri);
1569 createThread_(nat size, rtsBool have_lock, StgInt pri)
1573 createThread(nat stack_size)
1575 return createThread_(stack_size, rtsFalse);
1579 createThread_(nat size, rtsBool have_lock)
1586 /* First check whether we should create a thread at all */
1588 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1589 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1591 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1592 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1593 return END_TSO_QUEUE;
1599 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1602 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1604 /* catch ridiculously small stack sizes */
1605 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1606 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1609 stack_size = size - TSO_STRUCT_SIZEW;
1611 tso = (StgTSO *)allocate(size);
1612 TICK_ALLOC_TSO(stack_size, 0);
1614 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1616 SET_GRAN_HDR(tso, ThisPE);
1618 tso->what_next = ThreadEnterGHC;
1620 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1621 * protect the increment operation on next_thread_id.
1622 * In future, we could use an atomic increment instead.
1624 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1625 tso->id = next_thread_id++;
1626 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1628 tso->why_blocked = NotBlocked;
1629 tso->blocked_exceptions = NULL;
1631 tso->stack_size = stack_size;
1632 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1634 tso->sp = (P_)&(tso->stack) + stack_size;
1637 tso->prof.CCCS = CCS_MAIN;
1640 /* put a stop frame on the stack */
1641 tso->sp -= sizeofW(StgStopFrame);
1642 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1643 tso->su = (StgUpdateFrame*)tso->sp;
1647 tso->link = END_TSO_QUEUE;
1648 /* uses more flexible routine in GranSim */
1649 insertThread(tso, CurrentProc);
1651 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1657 if (RtsFlags.GranFlags.GranSimStats.Full)
1658 DumpGranEvent(GR_START,tso);
1660 if (RtsFlags.ParFlags.ParStats.Full)
1661 DumpGranEvent(GR_STARTQ,tso);
1662 /* HACk to avoid SCHEDULE
1666 /* Link the new thread on the global thread list.
1668 tso->global_link = all_threads;
1672 tso->dist.priority = MandatoryPriority; //by default that is...
1676 tso->gran.pri = pri;
1678 tso->gran.magic = TSO_MAGIC; // debugging only
1680 tso->gran.sparkname = 0;
1681 tso->gran.startedat = CURRENT_TIME;
1682 tso->gran.exported = 0;
1683 tso->gran.basicblocks = 0;
1684 tso->gran.allocs = 0;
1685 tso->gran.exectime = 0;
1686 tso->gran.fetchtime = 0;
1687 tso->gran.fetchcount = 0;
1688 tso->gran.blocktime = 0;
1689 tso->gran.blockcount = 0;
1690 tso->gran.blockedat = 0;
1691 tso->gran.globalsparks = 0;
1692 tso->gran.localsparks = 0;
1693 if (RtsFlags.GranFlags.Light)
1694 tso->gran.clock = Now; /* local clock */
1696 tso->gran.clock = 0;
1698 IF_DEBUG(gran,printTSO(tso));
1701 tso->par.magic = TSO_MAGIC; // debugging only
1703 tso->par.sparkname = 0;
1704 tso->par.startedat = CURRENT_TIME;
1705 tso->par.exported = 0;
1706 tso->par.basicblocks = 0;
1707 tso->par.allocs = 0;
1708 tso->par.exectime = 0;
1709 tso->par.fetchtime = 0;
1710 tso->par.fetchcount = 0;
1711 tso->par.blocktime = 0;
1712 tso->par.blockcount = 0;
1713 tso->par.blockedat = 0;
1714 tso->par.globalsparks = 0;
1715 tso->par.localsparks = 0;
1719 globalGranStats.tot_threads_created++;
1720 globalGranStats.threads_created_on_PE[CurrentProc]++;
1721 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1722 globalGranStats.tot_sq_probes++;
1724 // collect parallel global statistics (currently done together with GC stats)
1725 if (RtsFlags.ParFlags.ParStats.Global &&
1726 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1727 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
1728 globalParStats.tot_threads_created++;
1734 belch("==__ schedule: Created TSO %d (%p);",
1735 CurrentProc, tso, tso->id));
1737 IF_PAR_DEBUG(verbose,
1738 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1739 tso->id, tso, advisory_thread_count));
1741 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1742 tso->id, tso->stack_size));
1749 all parallel thread creation calls should fall through the following routine.
1752 createSparkThread(rtsSpark spark)
1754 ASSERT(spark != (rtsSpark)NULL);
1755 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1757 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1758 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1759 return END_TSO_QUEUE;
1763 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1764 if (tso==END_TSO_QUEUE)
1765 barf("createSparkThread: Cannot create TSO");
1767 tso->priority = AdvisoryPriority;
1769 pushClosure(tso,spark);
1770 PUSH_ON_RUN_QUEUE(tso);
1771 advisory_thread_count++;
1778 Turn a spark into a thread.
1779 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1782 //@cindex activateSpark
1784 activateSpark (rtsSpark spark)
1788 tso = createSparkThread(spark);
1789 if (RtsFlags.ParFlags.ParStats.Full) {
1790 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1791 IF_PAR_DEBUG(verbose,
1792 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1793 (StgClosure *)spark, info_type((StgClosure *)spark)));
1795 // ToDo: fwd info on local/global spark to thread -- HWL
1796 // tso->gran.exported = spark->exported;
1797 // tso->gran.locked = !spark->global;
1798 // tso->gran.sparkname = spark->name;
1804 /* ---------------------------------------------------------------------------
1807 * scheduleThread puts a thread on the head of the runnable queue.
1808 * This will usually be done immediately after a thread is created.
1809 * The caller of scheduleThread must create the thread using e.g.
1810 * createThread and push an appropriate closure
1811 * on this thread's stack before the scheduler is invoked.
1812 * ------------------------------------------------------------------------ */
1814 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1817 scheduleThread_(StgTSO *tso
1818 , rtsBool createTask
1819 #if !defined(THREADED_RTS)
1824 ACQUIRE_LOCK(&sched_mutex);
1826 /* Put the new thread on the head of the runnable queue. The caller
1827 * better push an appropriate closure on this thread's stack
1828 * beforehand. In the SMP case, the thread may start running as
1829 * soon as we release the scheduler lock below.
1831 PUSH_ON_RUN_QUEUE(tso);
1832 #if defined(THREADED_RTS)
1833 /* If main() is scheduling a thread, don't bother creating a
1837 startTask(taskStart);
1843 IF_DEBUG(scheduler,printTSO(tso));
1845 RELEASE_LOCK(&sched_mutex);
1848 void scheduleThread(StgTSO* tso)
1850 return scheduleThread_(tso, rtsFalse);
1853 void scheduleExtThread(StgTSO* tso)
1855 return scheduleThread_(tso, rtsTrue);
1858 /* ---------------------------------------------------------------------------
1861 * Initialise the scheduler. This resets all the queues - if the
1862 * queues contained any threads, they'll be garbage collected at the
1865 * ------------------------------------------------------------------------ */
1869 term_handler(int sig STG_UNUSED)
1872 ACQUIRE_LOCK(&term_mutex);
1874 RELEASE_LOCK(&term_mutex);
1885 for (i=0; i<=MAX_PROC; i++) {
1886 run_queue_hds[i] = END_TSO_QUEUE;
1887 run_queue_tls[i] = END_TSO_QUEUE;
1888 blocked_queue_hds[i] = END_TSO_QUEUE;
1889 blocked_queue_tls[i] = END_TSO_QUEUE;
1890 ccalling_threadss[i] = END_TSO_QUEUE;
1891 sleeping_queue = END_TSO_QUEUE;
1894 run_queue_hd = END_TSO_QUEUE;
1895 run_queue_tl = END_TSO_QUEUE;
1896 blocked_queue_hd = END_TSO_QUEUE;
1897 blocked_queue_tl = END_TSO_QUEUE;
1898 sleeping_queue = END_TSO_QUEUE;
1901 suspended_ccalling_threads = END_TSO_QUEUE;
1903 main_threads = NULL;
1904 all_threads = END_TSO_QUEUE;
1909 RtsFlags.ConcFlags.ctxtSwitchTicks =
1910 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1912 #if defined(RTS_SUPPORTS_THREADS)
1913 /* Initialise the mutex and condition variables used by
1915 initMutex(&sched_mutex);
1916 initMutex(&term_mutex);
1918 initCondition(&thread_ready_cond);
1922 initCondition(&gc_pending_cond);
1925 #if defined(RTS_SUPPORTS_THREADS)
1926 ACQUIRE_LOCK(&sched_mutex);
1929 /* Install the SIGHUP handler */
1932 struct sigaction action,oact;
1934 action.sa_handler = term_handler;
1935 sigemptyset(&action.sa_mask);
1936 action.sa_flags = 0;
1937 if (sigaction(SIGTERM, &action, &oact) != 0) {
1938 barf("can't install TERM handler");
1943 /* A capability holds the state a native thread needs in
1944 * order to execute STG code. At least one capability is
1945 * floating around (only SMP builds have more than one).
1949 #if defined(RTS_SUPPORTS_THREADS)
1950 /* start our haskell execution tasks */
1952 startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
1954 startTaskManager(0,taskStart);
1958 #if /* defined(SMP) ||*/ defined(PAR)
1962 #if defined(RTS_SUPPORTS_THREADS)
1963 RELEASE_LOCK(&sched_mutex);
1969 exitScheduler( void )
1971 #if defined(RTS_SUPPORTS_THREADS)
1976 /* -----------------------------------------------------------------------------
1977 Managing the per-task allocation areas.
1979 Each capability comes with an allocation area. These are
1980 fixed-length block lists into which allocation can be done.
1982 ToDo: no support for two-space collection at the moment???
1983 -------------------------------------------------------------------------- */
1985 /* -----------------------------------------------------------------------------
1986 * waitThread is the external interface for running a new computation
1987 * and waiting for the result.
1989 * In the non-SMP case, we create a new main thread, push it on the
1990 * main-thread stack, and invoke the scheduler to run it. The
1991 * scheduler will return when the top main thread on the stack has
1992 * completed or died, and fill in the necessary fields of the
1993 * main_thread structure.
1995 * In the SMP case, we create a main thread as before, but we then
1996 * create a new condition variable and sleep on it. When our new
1997 * main thread has completed, we'll be woken up and the status/result
1998 * will be in the main_thread struct.
1999 * -------------------------------------------------------------------------- */
2002 howManyThreadsAvail ( void )
2006 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2008 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2010 for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2016 finishAllThreads ( void )
2019 while (run_queue_hd != END_TSO_QUEUE) {
2020 waitThread ( run_queue_hd, NULL);
2022 while (blocked_queue_hd != END_TSO_QUEUE) {
2023 waitThread ( blocked_queue_hd, NULL);
2025 while (sleeping_queue != END_TSO_QUEUE) {
2026 waitThread ( blocked_queue_hd, NULL);
2029 (blocked_queue_hd != END_TSO_QUEUE ||
2030 run_queue_hd != END_TSO_QUEUE ||
2031 sleeping_queue != END_TSO_QUEUE);
2035 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2037 #if defined(THREADED_RTS)
2038 return waitThread_(tso,ret, rtsFalse);
2040 return waitThread_(tso,ret);
2045 waitThread_(StgTSO *tso,
2046 /*out*/StgClosure **ret
2047 #if defined(THREADED_RTS)
2048 , rtsBool blockWaiting
2053 SchedulerStatus stat;
2055 ACQUIRE_LOCK(&sched_mutex);
2057 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2062 #if defined(RTS_SUPPORTS_THREADS)
2063 initCondition(&m->wakeup);
2066 m->link = main_threads;
2069 IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2071 #if defined(RTS_SUPPORTS_THREADS)
2073 # if defined(THREADED_RTS)
2074 if (!blockWaiting) {
2075 /* In the threaded case, the OS thread that called main()
2076 * gets to enter the RTS directly without going via another
2079 RELEASE_LOCK(&sched_mutex);
2081 ASSERT(m->stat != NoStatus);
2085 IF_DEBUG(scheduler, sched_belch("sfoo"));
2087 waitCondition(&m->wakeup, &sched_mutex);
2088 } while (m->stat == NoStatus);
2091 /* GranSim specific init */
2092 CurrentTSO = m->tso; // the TSO to run
2093 procStatus[MainProc] = Busy; // status of main PE
2094 CurrentProc = MainProc; // PE to run it on
2098 RELEASE_LOCK(&sched_mutex);
2100 ASSERT(m->stat != NoStatus);
2105 #if defined(RTS_SUPPORTS_THREADS)
2106 closeCondition(&m->wakeup);
2109 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
2113 #if defined(THREADED_RTS)
2116 RELEASE_LOCK(&sched_mutex);
2121 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2122 //@subsection Run queue code
2126 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2127 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2128 implicit global variable that has to be correct when calling these
2132 /* Put the new thread on the head of the runnable queue.
2133 * The caller of createThread better push an appropriate closure
2134 * on this thread's stack before the scheduler is invoked.
2136 static /* inline */ void
2137 add_to_run_queue(tso)
2140 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2141 tso->link = run_queue_hd;
2143 if (run_queue_tl == END_TSO_QUEUE) {
2148 /* Put the new thread at the end of the runnable queue. */
2149 static /* inline */ void
2150 push_on_run_queue(tso)
2153 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2154 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2155 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2156 if (run_queue_hd == END_TSO_QUEUE) {
2159 run_queue_tl->link = tso;
2165 Should be inlined because it's used very often in schedule. The tso
2166 argument is actually only needed in GranSim, where we want to have the
2167 possibility to schedule *any* TSO on the run queue, irrespective of the
2168 actual ordering. Therefore, if tso is not the nil TSO then we traverse
2169 the run queue and dequeue the tso, adjusting the links in the queue.
2171 //@cindex take_off_run_queue
2172 static /* inline */ StgTSO*
2173 take_off_run_queue(StgTSO *tso) {
2177 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2179 if tso is specified, unlink that tso from the run_queue (doesn't have
2180 to be at the beginning of the queue); GranSim only
2182 if (tso!=END_TSO_QUEUE) {
2183 /* find tso in queue */
2184 for (t=run_queue_hd, prev=END_TSO_QUEUE;
2185 t!=END_TSO_QUEUE && t!=tso;
2189 /* now actually dequeue the tso */
2190 if (prev!=END_TSO_QUEUE) {
2191 ASSERT(run_queue_hd!=t);
2192 prev->link = t->link;
2194 /* t is at beginning of thread queue */
2195 ASSERT(run_queue_hd==t);
2196 run_queue_hd = t->link;
2198 /* t is at end of thread queue */
2199 if (t->link==END_TSO_QUEUE) {
2200 ASSERT(t==run_queue_tl);
2201 run_queue_tl = prev;
2203 ASSERT(run_queue_tl!=t);
2205 t->link = END_TSO_QUEUE;
2207 /* take tso from the beginning of the queue; std concurrent code */
2209 if (t != END_TSO_QUEUE) {
2210 run_queue_hd = t->link;
2211 t->link = END_TSO_QUEUE;
2212 if (run_queue_hd == END_TSO_QUEUE) {
2213 run_queue_tl = END_TSO_QUEUE;
2222 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2223 //@subsection Garbage Collextion Routines
2225 /* ---------------------------------------------------------------------------
2226 Where are the roots that we know about?
2228 - all the threads on the runnable queue
2229 - all the threads on the blocked queue
2230 - all the threads on the sleeping queue
2231 - all the thread currently executing a _ccall_GC
2232 - all the "main threads"
2234 ------------------------------------------------------------------------ */
2236 /* This has to be protected either by the scheduler monitor, or by the
2237 garbage collection monitor (probably the latter).
2242 GetRoots(evac_fn evac)
2249 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2250 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2251 evac((StgClosure **)&run_queue_hds[i]);
2252 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2253 evac((StgClosure **)&run_queue_tls[i]);
2255 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2256 evac((StgClosure **)&blocked_queue_hds[i]);
2257 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2258 evac((StgClosure **)&blocked_queue_tls[i]);
2259 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2260 evac((StgClosure **)&ccalling_threads[i]);
2267 if (run_queue_hd != END_TSO_QUEUE) {
2268 ASSERT(run_queue_tl != END_TSO_QUEUE);
2269 evac((StgClosure **)&run_queue_hd);
2270 evac((StgClosure **)&run_queue_tl);
2273 if (blocked_queue_hd != END_TSO_QUEUE) {
2274 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2275 evac((StgClosure **)&blocked_queue_hd);
2276 evac((StgClosure **)&blocked_queue_tl);
2279 if (sleeping_queue != END_TSO_QUEUE) {
2280 evac((StgClosure **)&sleeping_queue);
2284 for (m = main_threads; m != NULL; m = m->link) {
2285 evac((StgClosure **)&m->tso);
2287 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2288 evac((StgClosure **)&suspended_ccalling_threads);
2291 #if defined(PAR) || defined(GRAN)
2292 markSparkQueue(evac);
2296 /* -----------------------------------------------------------------------------
2299 This is the interface to the garbage collector from Haskell land.
2300 We provide this so that external C code can allocate and garbage
2301 collect when called from Haskell via _ccall_GC.
2303 It might be useful to provide an interface whereby the programmer
2304 can specify more roots (ToDo).
2306 This needs to be protected by the GC condition variable above. KH.
2307 -------------------------------------------------------------------------- */
2309 void (*extra_roots)(evac_fn);
2314 GarbageCollect(GetRoots,rtsFalse);
2318 performMajorGC(void)
2320 GarbageCollect(GetRoots,rtsTrue);
2324 AllRoots(evac_fn evac)
2326 GetRoots(evac); // the scheduler's roots
2327 extra_roots(evac); // the user's roots
2331 performGCWithRoots(void (*get_roots)(evac_fn))
2333 extra_roots = get_roots;
2334 GarbageCollect(AllRoots,rtsFalse);
2337 /* -----------------------------------------------------------------------------
2340 If the thread has reached its maximum stack size, then raise the
2341 StackOverflow exception in the offending thread. Otherwise
2342 relocate the TSO into a larger chunk of memory and adjust its stack
2344 -------------------------------------------------------------------------- */
2347 threadStackOverflow(StgTSO *tso)
2349 nat new_stack_size, new_tso_size, diff, stack_words;
2353 IF_DEBUG(sanity,checkTSO(tso));
2354 if (tso->stack_size >= tso->max_stack_size) {
2357 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2358 tso->id, tso, tso->stack_size, tso->max_stack_size);
2359 /* If we're debugging, just print out the top of the stack */
2360 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2363 /* Send this thread the StackOverflow exception */
2364 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2368 /* Try to double the current stack size. If that takes us over the
2369 * maximum stack size for this thread, then use the maximum instead.
2370 * Finally round up so the TSO ends up as a whole number of blocks.
2372 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2373 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2374 TSO_STRUCT_SIZE)/sizeof(W_);
2375 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2376 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2378 IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2380 dest = (StgTSO *)allocate(new_tso_size);
2381 TICK_ALLOC_TSO(new_stack_size,0);
2383 /* copy the TSO block and the old stack into the new area */
2384 memcpy(dest,tso,TSO_STRUCT_SIZE);
2385 stack_words = tso->stack + tso->stack_size - tso->sp;
2386 new_sp = (P_)dest + new_tso_size - stack_words;
2387 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2389 /* relocate the stack pointers... */
2390 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2391 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2393 dest->stack_size = new_stack_size;
2395 /* and relocate the update frame list */
2396 relocate_stack(dest, diff);
2398 /* Mark the old TSO as relocated. We have to check for relocated
2399 * TSOs in the garbage collector and any primops that deal with TSOs.
2401 * It's important to set the sp and su values to just beyond the end
2402 * of the stack, so we don't attempt to scavenge any part of the
2405 tso->what_next = ThreadRelocated;
2407 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2408 tso->su = (StgUpdateFrame *)tso->sp;
2409 tso->why_blocked = NotBlocked;
2410 dest->mut_link = NULL;
2412 IF_PAR_DEBUG(verbose,
2413 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2414 tso->id, tso, tso->stack_size);
2415 /* If we're debugging, just print out the top of the stack */
2416 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2419 IF_DEBUG(sanity,checkTSO(tso));
2421 IF_DEBUG(scheduler,printTSO(dest));
2427 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2428 //@subsection Blocking Queue Routines
2430 /* ---------------------------------------------------------------------------
2431 Wake up a queue that was blocked on some resource.
2432 ------------------------------------------------------------------------ */
2436 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2441 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2443 /* write RESUME events to log file and
2444 update blocked and fetch time (depending on type of the orig closure) */
2445 if (RtsFlags.ParFlags.ParStats.Full) {
2446 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2447 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2448 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2449 if (EMPTY_RUN_QUEUE())
2450 emitSchedule = rtsTrue;
2452 switch (get_itbl(node)->type) {
2454 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2459 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2466 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2473 static StgBlockingQueueElement *
2474 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2477 PEs node_loc, tso_loc;
2479 node_loc = where_is(node); // should be lifted out of loop
2480 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2481 tso_loc = where_is((StgClosure *)tso);
2482 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2483 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2484 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2485 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2486 // insertThread(tso, node_loc);
2487 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2489 tso, node, (rtsSpark*)NULL);
2490 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2493 } else { // TSO is remote (actually should be FMBQ)
2494 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2495 RtsFlags.GranFlags.Costs.gunblocktime +
2496 RtsFlags.GranFlags.Costs.latency;
2497 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2499 tso, node, (rtsSpark*)NULL);
2500 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2503 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2505 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2506 (node_loc==tso_loc ? "Local" : "Global"),
2507 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2508 tso->block_info.closure = NULL;
2509 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2513 static StgBlockingQueueElement *
2514 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2516 StgBlockingQueueElement *next;
2518 switch (get_itbl(bqe)->type) {
2520 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2521 /* if it's a TSO just push it onto the run_queue */
2523 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2524 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2526 unblockCount(bqe, node);
2527 /* reset blocking status after dumping event */
2528 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2532 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2534 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2535 PendingFetches = (StgBlockedFetch *)bqe;
2539 /* can ignore this case in a non-debugging setup;
2540 see comments on RBHSave closures above */
2542 /* check that the closure is an RBHSave closure */
2543 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2544 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2545 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2549 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2550 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2554 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2558 #else /* !GRAN && !PAR */
2560 unblockOneLocked(StgTSO *tso)
2564 ASSERT(get_itbl(tso)->type == TSO);
2565 ASSERT(tso->why_blocked != NotBlocked);
2566 tso->why_blocked = NotBlocked;
2568 PUSH_ON_RUN_QUEUE(tso);
2570 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2575 #if defined(GRAN) || defined(PAR)
2576 inline StgBlockingQueueElement *
2577 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2579 ACQUIRE_LOCK(&sched_mutex);
2580 bqe = unblockOneLocked(bqe, node);
2581 RELEASE_LOCK(&sched_mutex);
2586 unblockOne(StgTSO *tso)
2588 ACQUIRE_LOCK(&sched_mutex);
2589 tso = unblockOneLocked(tso);
2590 RELEASE_LOCK(&sched_mutex);
2597 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2599 StgBlockingQueueElement *bqe;
2604 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2605 node, CurrentProc, CurrentTime[CurrentProc],
2606 CurrentTSO->id, CurrentTSO));
2608 node_loc = where_is(node);
2610 ASSERT(q == END_BQ_QUEUE ||
2611 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2612 get_itbl(q)->type == CONSTR); // closure (type constructor)
2613 ASSERT(is_unique(node));
2615 /* FAKE FETCH: magically copy the node to the tso's proc;
2616 no Fetch necessary because in reality the node should not have been
2617 moved to the other PE in the first place
2619 if (CurrentProc!=node_loc) {
2621 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2622 node, node_loc, CurrentProc, CurrentTSO->id,
2623 // CurrentTSO, where_is(CurrentTSO),
2624 node->header.gran.procs));
2625 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2627 belch("## new bitmask of node %p is %#x",
2628 node, node->header.gran.procs));
2629 if (RtsFlags.GranFlags.GranSimStats.Global) {
2630 globalGranStats.tot_fake_fetches++;
2635 // ToDo: check: ASSERT(CurrentProc==node_loc);
2636 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2639 bqe points to the current element in the queue
2640 next points to the next element in the queue
2642 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2643 //tso_loc = where_is(tso);
2645 bqe = unblockOneLocked(bqe, node);
2648 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2649 the closure to make room for the anchor of the BQ */
2650 if (bqe!=END_BQ_QUEUE) {
2651 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2653 ASSERT((info_ptr==&RBH_Save_0_info) ||
2654 (info_ptr==&RBH_Save_1_info) ||
2655 (info_ptr==&RBH_Save_2_info));
2657 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2658 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2659 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2662 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2663 node, info_type(node)));
2666 /* statistics gathering */
2667 if (RtsFlags.GranFlags.GranSimStats.Global) {
2668 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2669 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2670 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2671 globalGranStats.tot_awbq++; // total no. of bqs awakened
2674 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2675 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2679 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2681 StgBlockingQueueElement *bqe;
2683 ACQUIRE_LOCK(&sched_mutex);
2685 IF_PAR_DEBUG(verbose,
2686 belch("##-_ AwBQ for node %p on [%x]: ",
2690 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2691 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2696 ASSERT(q == END_BQ_QUEUE ||
2697 get_itbl(q)->type == TSO ||
2698 get_itbl(q)->type == BLOCKED_FETCH ||
2699 get_itbl(q)->type == CONSTR);
2702 while (get_itbl(bqe)->type==TSO ||
2703 get_itbl(bqe)->type==BLOCKED_FETCH) {
2704 bqe = unblockOneLocked(bqe, node);
2706 RELEASE_LOCK(&sched_mutex);
2709 #else /* !GRAN && !PAR */
2711 awakenBlockedQueue(StgTSO *tso)
2713 ACQUIRE_LOCK(&sched_mutex);
2714 while (tso != END_TSO_QUEUE) {
2715 tso = unblockOneLocked(tso);
2717 RELEASE_LOCK(&sched_mutex);
2721 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2722 //@subsection Exception Handling Routines
2724 /* ---------------------------------------------------------------------------
2726 - usually called inside a signal handler so it mustn't do anything fancy.
2727 ------------------------------------------------------------------------ */
2730 interruptStgRts(void)
2736 /* -----------------------------------------------------------------------------
2739 This is for use when we raise an exception in another thread, which
2741 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2742 -------------------------------------------------------------------------- */
2744 #if defined(GRAN) || defined(PAR)
2746 NB: only the type of the blocking queue is different in GranSim and GUM
2747 the operations on the queue-elements are the same
2748 long live polymorphism!
2751 unblockThread(StgTSO *tso)
2753 StgBlockingQueueElement *t, **last;
2755 ACQUIRE_LOCK(&sched_mutex);
2756 switch (tso->why_blocked) {
2759 return; /* not blocked */
2762 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2764 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2765 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2767 last = (StgBlockingQueueElement **)&mvar->head;
2768 for (t = (StgBlockingQueueElement *)mvar->head;
2770 last = &t->link, last_tso = t, t = t->link) {
2771 if (t == (StgBlockingQueueElement *)tso) {
2772 *last = (StgBlockingQueueElement *)tso->link;
2773 if (mvar->tail == tso) {
2774 mvar->tail = (StgTSO *)last_tso;
2779 barf("unblockThread (MVAR): TSO not found");
2782 case BlockedOnBlackHole:
2783 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2785 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2787 last = &bq->blocking_queue;
2788 for (t = bq->blocking_queue;
2790 last = &t->link, t = t->link) {
2791 if (t == (StgBlockingQueueElement *)tso) {
2792 *last = (StgBlockingQueueElement *)tso->link;
2796 barf("unblockThread (BLACKHOLE): TSO not found");
2799 case BlockedOnException:
2801 StgTSO *target = tso->block_info.tso;
2803 ASSERT(get_itbl(target)->type == TSO);
2805 if (target->what_next == ThreadRelocated) {
2806 target = target->link;
2807 ASSERT(get_itbl(target)->type == TSO);
2810 ASSERT(target->blocked_exceptions != NULL);
2812 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2813 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2815 last = &t->link, t = t->link) {
2816 ASSERT(get_itbl(t)->type == TSO);
2817 if (t == (StgBlockingQueueElement *)tso) {
2818 *last = (StgBlockingQueueElement *)tso->link;
2822 barf("unblockThread (Exception): TSO not found");
2826 case BlockedOnWrite:
2828 /* take TSO off blocked_queue */
2829 StgBlockingQueueElement *prev = NULL;
2830 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2831 prev = t, t = t->link) {
2832 if (t == (StgBlockingQueueElement *)tso) {
2834 blocked_queue_hd = (StgTSO *)t->link;
2835 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2836 blocked_queue_tl = END_TSO_QUEUE;
2839 prev->link = t->link;
2840 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2841 blocked_queue_tl = (StgTSO *)prev;
2847 barf("unblockThread (I/O): TSO not found");
2850 case BlockedOnDelay:
2852 /* take TSO off sleeping_queue */
2853 StgBlockingQueueElement *prev = NULL;
2854 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2855 prev = t, t = t->link) {
2856 if (t == (StgBlockingQueueElement *)tso) {
2858 sleeping_queue = (StgTSO *)t->link;
2860 prev->link = t->link;
2865 barf("unblockThread (I/O): TSO not found");
2869 barf("unblockThread");
2873 tso->link = END_TSO_QUEUE;
2874 tso->why_blocked = NotBlocked;
2875 tso->block_info.closure = NULL;
2876 PUSH_ON_RUN_QUEUE(tso);
2877 RELEASE_LOCK(&sched_mutex);
2881 unblockThread(StgTSO *tso)
2885 ACQUIRE_LOCK(&sched_mutex);
2886 switch (tso->why_blocked) {
2889 return; /* not blocked */
2892 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2894 StgTSO *last_tso = END_TSO_QUEUE;
2895 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2898 for (t = mvar->head; t != END_TSO_QUEUE;
2899 last = &t->link, last_tso = t, t = t->link) {
2902 if (mvar->tail == tso) {
2903 mvar->tail = last_tso;
2908 barf("unblockThread (MVAR): TSO not found");
2911 case BlockedOnBlackHole:
2912 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2914 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2916 last = &bq->blocking_queue;
2917 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2918 last = &t->link, t = t->link) {
2924 barf("unblockThread (BLACKHOLE): TSO not found");
2927 case BlockedOnException:
2929 StgTSO *target = tso->block_info.tso;
2931 ASSERT(get_itbl(target)->type == TSO);
2933 while (target->what_next == ThreadRelocated) {
2934 target = target->link;
2935 ASSERT(get_itbl(target)->type == TSO);
2938 ASSERT(target->blocked_exceptions != NULL);
2940 last = &target->blocked_exceptions;
2941 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2942 last = &t->link, t = t->link) {
2943 ASSERT(get_itbl(t)->type == TSO);
2949 barf("unblockThread (Exception): TSO not found");
2953 case BlockedOnWrite:
2955 StgTSO *prev = NULL;
2956 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2957 prev = t, t = t->link) {
2960 blocked_queue_hd = t->link;
2961 if (blocked_queue_tl == t) {
2962 blocked_queue_tl = END_TSO_QUEUE;
2965 prev->link = t->link;
2966 if (blocked_queue_tl == t) {
2967 blocked_queue_tl = prev;
2973 barf("unblockThread (I/O): TSO not found");
2976 case BlockedOnDelay:
2978 StgTSO *prev = NULL;
2979 for (t = sleeping_queue; t != END_TSO_QUEUE;
2980 prev = t, t = t->link) {
2983 sleeping_queue = t->link;
2985 prev->link = t->link;
2990 barf("unblockThread (I/O): TSO not found");
2994 barf("unblockThread");
2998 tso->link = END_TSO_QUEUE;
2999 tso->why_blocked = NotBlocked;
3000 tso->block_info.closure = NULL;
3001 PUSH_ON_RUN_QUEUE(tso);
3002 RELEASE_LOCK(&sched_mutex);
3006 /* -----------------------------------------------------------------------------
3009 * The following function implements the magic for raising an
3010 * asynchronous exception in an existing thread.
3012 * We first remove the thread from any queue on which it might be
3013 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3015 * We strip the stack down to the innermost CATCH_FRAME, building
3016 * thunks in the heap for all the active computations, so they can
3017 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3018 * an application of the handler to the exception, and push it on
3019 * the top of the stack.
3021 * How exactly do we save all the active computations? We create an
3022 * AP_UPD for every UpdateFrame on the stack. Entering one of these
3023 * AP_UPDs pushes everything from the corresponding update frame
3024 * upwards onto the stack. (Actually, it pushes everything up to the
3025 * next update frame plus a pointer to the next AP_UPD object.
3026 * Entering the next AP_UPD object pushes more onto the stack until we
3027 * reach the last AP_UPD object - at which point the stack should look
3028 * exactly as it did when we killed the TSO and we can continue
3029 * execution by entering the closure on top of the stack.
3031 * We can also kill a thread entirely - this happens if either (a) the
3032 * exception passed to raiseAsync is NULL, or (b) there's no
3033 * CATCH_FRAME on the stack. In either case, we strip the entire
3034 * stack and replace the thread with a zombie.
3036 * -------------------------------------------------------------------------- */
3039 deleteThread(StgTSO *tso)
3041 raiseAsync(tso,NULL);
3045 raiseAsync(StgTSO *tso, StgClosure *exception)
3047 StgUpdateFrame* su = tso->su;
3048 StgPtr sp = tso->sp;
3050 /* Thread already dead? */
3051 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3055 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3057 /* Remove it from any blocking queues */
3060 /* The stack freezing code assumes there's a closure pointer on
3061 * the top of the stack. This isn't always the case with compiled
3062 * code, so we have to push a dummy closure on the top which just
3063 * returns to the next return address on the stack.
3065 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3066 *(--sp) = (W_)&stg_dummy_ret_closure;
3070 nat words = ((P_)su - (P_)sp) - 1;
3074 /* If we find a CATCH_FRAME, and we've got an exception to raise,
3075 * then build PAP(handler,exception,realworld#), and leave it on
3076 * top of the stack ready to enter.
3078 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3079 StgCatchFrame *cf = (StgCatchFrame *)su;
3080 /* we've got an exception to raise, so let's pass it to the
3081 * handler in this frame.
3083 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3084 TICK_ALLOC_UPD_PAP(3,0);
3085 SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3088 ap->fun = cf->handler; /* :: Exception -> IO a */
3089 ap->payload[0] = exception;
3090 ap->payload[1] = ARG_TAG(0); /* realworld token */
3092 /* throw away the stack from Sp up to and including the
3095 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
3098 /* Restore the blocked/unblocked state for asynchronous exceptions
3099 * at the CATCH_FRAME.
3101 * If exceptions were unblocked at the catch, arrange that they
3102 * are unblocked again after executing the handler by pushing an
3103 * unblockAsyncExceptions_ret stack frame.
3105 if (!cf->exceptions_blocked) {
3106 *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3109 /* Ensure that async exceptions are blocked when running the handler.
3111 if (tso->blocked_exceptions == NULL) {
3112 tso->blocked_exceptions = END_TSO_QUEUE;
3115 /* Put the newly-built PAP on top of the stack, ready to execute
3116 * when the thread restarts.
3120 tso->what_next = ThreadEnterGHC;
3121 IF_DEBUG(sanity, checkTSO(tso));
3125 /* First build an AP_UPD consisting of the stack chunk above the
3126 * current update frame, with the top word on the stack as the
3129 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3134 ap->fun = (StgClosure *)sp[0];
3136 for(i=0; i < (nat)words; ++i) {
3137 ap->payload[i] = (StgClosure *)*sp++;
3140 switch (get_itbl(su)->type) {
3144 SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */);
3145 TICK_ALLOC_UP_THK(words+1,0);
3148 fprintf(stderr, "scheduler: Updating ");
3149 printPtr((P_)su->updatee);
3150 fprintf(stderr, " with ");
3151 printObj((StgClosure *)ap);
3154 /* Replace the updatee with an indirection - happily
3155 * this will also wake up any threads currently
3156 * waiting on the result.
3158 * Warning: if we're in a loop, more than one update frame on
3159 * the stack may point to the same object. Be careful not to
3160 * overwrite an IND_OLDGEN in this case, because we'll screw
3161 * up the mutable lists. To be on the safe side, don't
3162 * overwrite any kind of indirection at all. See also
3163 * threadSqueezeStack in GC.c, where we have to make a similar
3166 if (!closure_IND(su->updatee)) {
3167 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
3170 sp += sizeofW(StgUpdateFrame) -1;
3171 sp[0] = (W_)ap; /* push onto stack */
3177 StgCatchFrame *cf = (StgCatchFrame *)su;
3180 /* We want a PAP, not an AP_UPD. Fortunately, the
3181 * layout's the same.
3183 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3184 TICK_ALLOC_UPD_PAP(words+1,0);
3186 /* now build o = FUN(catch,ap,handler) */
3187 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3188 TICK_ALLOC_FUN(2,0);
3189 SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3190 o->payload[0] = (StgClosure *)ap;
3191 o->payload[1] = cf->handler;
3194 fprintf(stderr, "scheduler: Built ");
3195 printObj((StgClosure *)o);
3198 /* pop the old handler and put o on the stack */
3200 sp += sizeofW(StgCatchFrame) - 1;
3207 StgSeqFrame *sf = (StgSeqFrame *)su;
3210 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3211 TICK_ALLOC_UPD_PAP(words+1,0);
3213 /* now build o = FUN(seq,ap) */
3214 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3215 TICK_ALLOC_SE_THK(1,0);
3216 SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3217 o->payload[0] = (StgClosure *)ap;
3220 fprintf(stderr, "scheduler: Built ");
3221 printObj((StgClosure *)o);
3224 /* pop the old handler and put o on the stack */
3226 sp += sizeofW(StgSeqFrame) - 1;
3232 /* We've stripped the entire stack, the thread is now dead. */
3233 sp += sizeofW(StgStopFrame) - 1;
3234 sp[0] = (W_)exception; /* save the exception */
3235 tso->what_next = ThreadKilled;
3236 tso->su = (StgUpdateFrame *)(sp+1);
3247 /* -----------------------------------------------------------------------------
3248 resurrectThreads is called after garbage collection on the list of
3249 threads found to be garbage. Each of these threads will be woken
3250 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3251 on an MVar, or NonTermination if the thread was blocked on a Black
3253 -------------------------------------------------------------------------- */
3256 resurrectThreads( StgTSO *threads )
3260 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3261 next = tso->global_link;
3262 tso->global_link = all_threads;
3264 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3266 switch (tso->why_blocked) {
3268 case BlockedOnException:
3269 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3271 case BlockedOnBlackHole:
3272 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3275 /* This might happen if the thread was blocked on a black hole
3276 * belonging to a thread that we've just woken up (raiseAsync
3277 * can wake up threads, remember...).
3281 barf("resurrectThreads: thread blocked in a strange way");
3286 /* -----------------------------------------------------------------------------
3287 * Blackhole detection: if we reach a deadlock, test whether any
3288 * threads are blocked on themselves. Any threads which are found to
3289 * be self-blocked get sent a NonTermination exception.
3291 * This is only done in a deadlock situation in order to avoid
3292 * performance overhead in the normal case.
3293 * -------------------------------------------------------------------------- */
3296 detectBlackHoles( void )
3298 StgTSO *t = all_threads;
3299 StgUpdateFrame *frame;
3300 StgClosure *blocked_on;
3302 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3304 while (t->what_next == ThreadRelocated) {
3306 ASSERT(get_itbl(t)->type == TSO);
3309 if (t->why_blocked != BlockedOnBlackHole) {
3313 blocked_on = t->block_info.closure;
3315 for (frame = t->su; ; frame = frame->link) {
3316 switch (get_itbl(frame)->type) {
3319 if (frame->updatee == blocked_on) {
3320 /* We are blocking on one of our own computations, so
3321 * send this thread the NonTermination exception.
3324 sched_belch("thread %d is blocked on itself", t->id));
3325 raiseAsync(t, (StgClosure *)NonTermination_closure);
3346 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3347 //@subsection Debugging Routines
3349 /* -----------------------------------------------------------------------------
3350 Debugging: why is a thread blocked
3351 -------------------------------------------------------------------------- */
3356 printThreadBlockage(StgTSO *tso)
3358 switch (tso->why_blocked) {
3360 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3362 case BlockedOnWrite:
3363 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3365 case BlockedOnDelay:
3366 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3369 fprintf(stderr,"is blocked on an MVar");
3371 case BlockedOnException:
3372 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3373 tso->block_info.tso->id);
3375 case BlockedOnBlackHole:
3376 fprintf(stderr,"is blocked on a black hole");
3379 fprintf(stderr,"is not blocked");
3383 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3384 tso->block_info.closure, info_type(tso->block_info.closure));
3386 case BlockedOnGA_NoSend:
3387 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3388 tso->block_info.closure, info_type(tso->block_info.closure));
3391 #if defined(RTS_SUPPORTS_THREADS)
3392 case BlockedOnCCall:
3393 fprintf(stderr,"is blocked on an external call");
3397 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3398 tso->why_blocked, tso->id, tso);
3403 printThreadStatus(StgTSO *tso)
3405 switch (tso->what_next) {
3407 fprintf(stderr,"has been killed");
3409 case ThreadComplete:
3410 fprintf(stderr,"has completed");
3413 printThreadBlockage(tso);
3418 printAllThreads(void)
3423 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3424 ullong_format_string(TIME_ON_PROC(CurrentProc),
3425 time_string, rtsFalse/*no commas!*/);
3427 sched_belch("all threads at [%s]:", time_string);
3429 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3430 ullong_format_string(CURRENT_TIME,
3431 time_string, rtsFalse/*no commas!*/);
3433 sched_belch("all threads at [%s]:", time_string);
3435 sched_belch("all threads:");
3438 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3439 fprintf(stderr, "\tthread %d ", t->id);
3440 printThreadStatus(t);
3441 fprintf(stderr,"\n");
3446 Print a whole blocking queue attached to node (debugging only).
3451 print_bq (StgClosure *node)
3453 StgBlockingQueueElement *bqe;
3457 fprintf(stderr,"## BQ of closure %p (%s): ",
3458 node, info_type(node));
3460 /* should cover all closures that may have a blocking queue */
3461 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3462 get_itbl(node)->type == FETCH_ME_BQ ||
3463 get_itbl(node)->type == RBH ||
3464 get_itbl(node)->type == MVAR);
3466 ASSERT(node!=(StgClosure*)NULL); // sanity check
3468 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3472 Print a whole blocking queue starting with the element bqe.
3475 print_bqe (StgBlockingQueueElement *bqe)
3480 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3482 for (end = (bqe==END_BQ_QUEUE);
3483 !end; // iterate until bqe points to a CONSTR
3484 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3485 bqe = end ? END_BQ_QUEUE : bqe->link) {
3486 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3487 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3488 /* types of closures that may appear in a blocking queue */
3489 ASSERT(get_itbl(bqe)->type == TSO ||
3490 get_itbl(bqe)->type == BLOCKED_FETCH ||
3491 get_itbl(bqe)->type == CONSTR);
3492 /* only BQs of an RBH end with an RBH_Save closure */
3493 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3495 switch (get_itbl(bqe)->type) {
3497 fprintf(stderr," TSO %u (%x),",
3498 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3501 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3502 ((StgBlockedFetch *)bqe)->node,
3503 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3504 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3505 ((StgBlockedFetch *)bqe)->ga.weight);
3508 fprintf(stderr," %s (IP %p),",
3509 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3510 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3511 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3512 "RBH_Save_?"), get_itbl(bqe));
3515 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3516 info_type((StgClosure *)bqe)); // , node, info_type(node));
3520 fputc('\n', stderr);
3522 # elif defined(GRAN)
3524 print_bq (StgClosure *node)
3526 StgBlockingQueueElement *bqe;
3527 PEs node_loc, tso_loc;
3530 /* should cover all closures that may have a blocking queue */
3531 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3532 get_itbl(node)->type == FETCH_ME_BQ ||
3533 get_itbl(node)->type == RBH);
3535 ASSERT(node!=(StgClosure*)NULL); // sanity check
3536 node_loc = where_is(node);
3538 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3539 node, info_type(node), node_loc);
3542 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3544 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3545 !end; // iterate until bqe points to a CONSTR
3546 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3547 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3548 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3549 /* types of closures that may appear in a blocking queue */
3550 ASSERT(get_itbl(bqe)->type == TSO ||
3551 get_itbl(bqe)->type == CONSTR);
3552 /* only BQs of an RBH end with an RBH_Save closure */
3553 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3555 tso_loc = where_is((StgClosure *)bqe);
3556 switch (get_itbl(bqe)->type) {
3558 fprintf(stderr," TSO %d (%p) on [PE %d],",
3559 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3562 fprintf(stderr," %s (IP %p),",
3563 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3564 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3565 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3566 "RBH_Save_?"), get_itbl(bqe));
3569 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3570 info_type((StgClosure *)bqe), node, info_type(node));
3574 fputc('\n', stderr);
3578 Nice and easy: only TSOs on the blocking queue
3581 print_bq (StgClosure *node)
3585 ASSERT(node!=(StgClosure*)NULL); // sanity check
3586 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3587 tso != END_TSO_QUEUE;
3589 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3590 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3591 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3593 fputc('\n', stderr);
3604 for (i=0, tso=run_queue_hd;
3605 tso != END_TSO_QUEUE;
3614 sched_belch(char *s, ...)
3619 fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3621 fprintf(stderr, "== ");
3623 fprintf(stderr, "scheduler: ");
3625 vfprintf(stderr, s, ap);
3626 fprintf(stderr, "\n");
3632 //@node Index, , Debugging Routines, Main scheduling code
3636 //* StgMainThread:: @cindex\s-+StgMainThread
3637 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3638 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3639 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3640 //* context_switch:: @cindex\s-+context_switch
3641 //* createThread:: @cindex\s-+createThread
3642 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3643 //* initScheduler:: @cindex\s-+initScheduler
3644 //* interrupted:: @cindex\s-+interrupted
3645 //* next_thread_id:: @cindex\s-+next_thread_id
3646 //* print_bq:: @cindex\s-+print_bq
3647 //* run_queue_hd:: @cindex\s-+run_queue_hd
3648 //* run_queue_tl:: @cindex\s-+run_queue_tl
3649 //* sched_mutex:: @cindex\s-+sched_mutex
3650 //* schedule:: @cindex\s-+schedule
3651 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3652 //* term_mutex:: @cindex\s-+term_mutex