1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.133 2002/03/12 11:51:06 simonmar Exp $
4 * (c) The GHC Team, 1998-2000
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distributed memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
18 * --------------------------------------------------------------------------*/
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
24 * Version with scheduler monitor support for SMPs (WAY=s):
26 This design provides a high-level API to create and schedule threads etc.
27 as documented in the SMP design document.
29 It uses a monitor design controlled by a single mutex to exercise control
30 over accesses to shared data structures, and builds on the Posix threads
33 The majority of state is shared. In order to keep essential per-task state,
34 there is a Capability structure, which contains all the information
35 needed to run a thread: its STG registers, a pointer to its TSO, a
36 nursery etc. During STG execution, a pointer to the capability is
37 kept in a register (BaseReg).
39 In a non-SMP build, there is one global capability, namely MainRegTable.
43 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
45 The main scheduling loop in GUM iterates until a finish message is received.
46 In that case a global flag @receivedFinish@ is set and this instance of
47 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48 for the handling of incoming messages, such as PP_FINISH.
49 Note that in the parallel case we have a system manager that coordinates
50 different PEs, each of which are running one instance of the RTS.
51 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
54 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
56 The main scheduling code in GranSim is quite different from that in std
57 (concurrent) Haskell: while concurrent Haskell just iterates over the
58 threads in the runnable queue, GranSim is event driven, i.e. it iterates
59 over the events in the global event queue. -- HWL
64 //* Variables and Data structures::
65 //* Main scheduling loop::
66 //* Suspend and Resume::
68 //* Garbage Collextion Routines::
69 //* Blocking Queue Routines::
70 //* Exception Handling Routines::
71 //* Debugging Routines::
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
78 #include "PosixSource.h"
85 #include "StgStartup.h"
88 #include "StgMiscClosures.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
122 /* Main thread queue.
123 * Locks required: sched_mutex.
125 StgMainThread *main_threads;
128 * Locks required: sched_mutex.
132 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
133 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
136 In GranSim we have a runnable and a blocked queue for each processor.
137 In order to minimise code changes new arrays run_queue_hds/tls
138 are created. run_queue_hd is then a short cut (macro) for
139 run_queue_hds[CurrentProc] (see GranSim.h).
142 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
143 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
144 StgTSO *ccalling_threadss[MAX_PROC];
145 /* We use the same global list of threads (all_threads) in GranSim as in
146 the std RTS (i.e. we are cheating). However, we don't use this list in
147 the GranSim specific code at the moment (so we are only potentially
152 StgTSO *run_queue_hd, *run_queue_tl;
153 StgTSO *blocked_queue_hd, *blocked_queue_tl;
154 StgTSO *sleeping_queue; /* perhaps replace with a hash table? */
158 /* Linked list of all threads.
159 * Used for detecting garbage collected threads.
163 /* When a thread performs a safe C call (_ccall_GC, using old
164 * terminology), it gets put on the suspended_ccalling_threads
165 * list. Used by the garbage collector.
167 static StgTSO *suspended_ccalling_threads;
169 static StgTSO *threadStackOverflow(StgTSO *tso);
171 /* KH: The following two flags are shared memory locations. There is no need
172 to lock them, since they are only unset at the end of a scheduler
176 /* flag set by signal handler to precipitate a context switch */
177 //@cindex context_switch
180 /* if this flag is set as well, give up execution */
181 //@cindex interrupted
184 /* Next thread ID to allocate.
185 * Locks required: sched_mutex
187 //@cindex next_thread_id
188 StgThreadID next_thread_id = 1;
191 * Pointers to the state of the current thread.
192 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
193 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
196 /* The smallest stack size that makes any sense is:
197 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
198 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
199 * + 1 (the realworld token for an IO thread)
200 * + 1 (the closure to enter)
202 * A thread with this stack will bomb immediately with a stack
203 * overflow, which will increase its stack size.
206 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
213 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
214 * exists - earlier gccs apparently didn't.
221 void addToBlockedQueue ( StgTSO *tso );
223 static void schedule ( void );
224 void interruptStgRts ( void );
226 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
228 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
231 static void detectBlackHoles ( void );
234 static void sched_belch(char *s, ...);
237 #if defined(RTS_SUPPORTS_THREADS)
238 /* ToDo: carefully document the invariants that go together
239 * with these synchronisation objects.
241 Mutex sched_mutex = INIT_MUTEX_VAR;
242 Mutex term_mutex = INIT_MUTEX_VAR;
245 static Condition gc_pending_cond = INIT_COND_VAR;
249 #endif /* RTS_SUPPORTS_THREADS */
253 rtsTime TimeOfLastYield;
254 rtsBool emitSchedule = rtsTrue;
258 char *whatNext_strs[] = {
266 char *threadReturnCode_strs[] = {
267 "HeapOverflow", /* might also be StackOverflow */
276 StgTSO * createSparkThread(rtsSpark spark);
277 StgTSO * activateSpark (rtsSpark spark);
281 * The thread state for the main thread.
282 // ToDo: check whether not needed any more
286 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
287 static void taskStart(void);
298 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
299 //@subsection Main scheduling loop
301 /* ---------------------------------------------------------------------------
302 Main scheduling loop.
304 We use round-robin scheduling, each thread returning to the
305 scheduler loop when one of these conditions is detected:
308 * timer expires (thread yields)
313 Locking notes: we acquire the scheduler lock once at the beginning
314 of the scheduler loop, and release it when
316 * running a thread, or
317 * waiting for work, or
318 * waiting for a GC to complete.
321 In a GranSim setup this loop iterates over the global event queue.
322 This revolves around the global event queue, which determines what
323 to do next. Therefore, it's more complicated than either the
324 concurrent or the parallel (GUM) setup.
327 GUM iterates over incoming messages.
328 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
329 and sends out a fish whenever it has nothing to do; in-between
330 doing the actual reductions (shared code below) it processes the
331 incoming messages and deals with delayed operations
332 (see PendingFetches).
333 This is not the ugliest code you could imagine, but it's bloody close.
335 ------------------------------------------------------------------------ */
342 StgThreadReturnCode ret;
350 rtsBool receivedFinish = rtsFalse;
352 nat tp_size, sp_size; // stats only
355 rtsBool was_interrupted = rtsFalse;
357 ACQUIRE_LOCK(&sched_mutex);
359 #if defined(RTS_SUPPORTS_THREADS)
360 /* Check to see whether there are any worker threads
361 waiting to deposit external call results. If so,
362 yield our capability */
363 yieldToReturningWorker(&sched_mutex, cap);
365 waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
369 /* set up first event to get things going */
370 /* ToDo: assign costs for system setup and init MainTSO ! */
371 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
373 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
376 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
377 G_TSO(CurrentTSO, 5));
379 if (RtsFlags.GranFlags.Light) {
380 /* Save current time; GranSim Light only */
381 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
384 event = get_next_event();
386 while (event!=(rtsEvent*)NULL) {
387 /* Choose the processor with the next event */
388 CurrentProc = event->proc;
389 CurrentTSO = event->tso;
393 while (!receivedFinish) { /* set by processMessages */
394 /* when receiving PP_FINISH message */
401 IF_DEBUG(scheduler, printAllThreads());
403 /* If we're interrupted (the user pressed ^C, or some other
404 * termination condition occurred), kill all the currently running
408 IF_DEBUG(scheduler, sched_belch("interrupted"));
410 interrupted = rtsFalse;
411 was_interrupted = rtsTrue;
414 /* Go through the list of main threads and wake up any
415 * clients whose computations have finished. ToDo: this
416 * should be done more efficiently without a linear scan
417 * of the main threads list, somehow...
419 #if defined(RTS_SUPPORTS_THREADS)
421 StgMainThread *m, **prev;
422 prev = &main_threads;
423 for (m = main_threads; m != NULL; m = m->link) {
424 switch (m->tso->what_next) {
427 *(m->ret) = (StgClosure *)m->tso->sp[0];
431 broadcastCondition(&m->wakeup);
434 if (m->ret) *(m->ret) = NULL;
436 if (was_interrupted) {
437 m->stat = Interrupted;
441 broadcastCondition(&m->wakeup);
449 #else /* not threaded */
452 /* in GUM do this only on the Main PE */
455 /* If our main thread has finished or been killed, return.
458 StgMainThread *m = main_threads;
459 if (m->tso->what_next == ThreadComplete
460 || m->tso->what_next == ThreadKilled) {
461 main_threads = main_threads->link;
462 if (m->tso->what_next == ThreadComplete) {
463 /* we finished successfully, fill in the return value */
464 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
468 if (m->ret) { *(m->ret) = NULL; };
469 if (was_interrupted) {
470 m->stat = Interrupted;
480 /* Top up the run queue from our spark pool. We try to make the
481 * number of threads in the run queue equal to the number of
484 * Disable spark support in SMP for now, non-essential & requires
485 * a little bit of work to make it compile cleanly. -- sof 1/02.
487 #if 0 /* defined(SMP) */
489 nat n = getFreeCapabilities();
490 StgTSO *tso = run_queue_hd;
492 /* Count the run queue */
493 while (n > 0 && tso != END_TSO_QUEUE) {
500 spark = findSpark(rtsFalse);
502 break; /* no more sparks in the pool */
504 /* I'd prefer this to be done in activateSpark -- HWL */
505 /* tricky - it needs to hold the scheduler lock and
506 * not try to re-acquire it -- SDM */
507 createSparkThread(spark);
509 sched_belch("==^^ turning spark of closure %p into a thread",
510 (StgClosure *)spark));
513 /* We need to wake up the other tasks if we just created some
516 if (getFreeCapabilities() - n > 1) {
517 signalCondition( &thread_ready_cond );
522 /* check for signals each time around the scheduler */
523 #ifndef mingw32_TARGET_OS
524 if (signals_pending()) {
525 startSignalHandlers();
529 /* Check whether any waiting threads need to be woken up. If the
530 * run queue is empty, and there are no other tasks running, we
531 * can wait indefinitely for something to happen.
532 * ToDo: what if another client comes along & requests another
535 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
536 awaitEvent( EMPTY_RUN_QUEUE()
538 && allFreeCapabilities()
542 /* we can be interrupted while waiting for I/O... */
543 if (interrupted) continue;
546 * Detect deadlock: when we have no threads to run, there are no
547 * threads waiting on I/O or sleeping, and all the other tasks are
548 * waiting for work, we must have a deadlock of some description.
550 * We first try to find threads blocked on themselves (ie. black
551 * holes), and generate NonTermination exceptions where necessary.
553 * If no threads are black holed, we have a deadlock situation, so
554 * inform all the main threads.
557 if ( EMPTY_RUN_QUEUE()
558 && EMPTY_QUEUE(blocked_queue_hd)
559 && EMPTY_QUEUE(sleeping_queue)
560 #if defined(RTS_SUPPORTS_THREADS)
561 && EMPTY_QUEUE(suspended_ccalling_threads)
564 && allFreeCapabilities()
568 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
569 #if defined(THREADED_RTS)
570 /* and SMP mode ..? */
571 releaseCapability(cap);
573 GarbageCollect(GetRoots,rtsTrue);
574 if ( EMPTY_QUEUE(blocked_queue_hd)
576 && EMPTY_QUEUE(sleeping_queue) ) {
578 IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
581 /* No black holes, so probably a real deadlock. Send the
582 * current main thread the Deadlock exception (or in the SMP
583 * build, send *all* main threads the deadlock exception,
584 * since none of them can make progress).
586 if ( EMPTY_RUN_QUEUE() ) {
588 #if defined(RTS_SUPPORTS_THREADS)
589 for (m = main_threads; m != NULL; m = m->link) {
590 switch (m->tso->why_blocked) {
591 case BlockedOnBlackHole:
592 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
594 case BlockedOnException:
596 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
599 barf("deadlock: main thread blocked in a strange way");
604 switch (m->tso->why_blocked) {
605 case BlockedOnBlackHole:
606 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
608 case BlockedOnException:
610 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
613 barf("deadlock: main thread blocked in a strange way");
617 #if defined(RTS_SUPPORTS_THREADS)
618 /* ToDo: revisit conditions (and mechanism) for shutting
619 down a multi-threaded world */
620 if ( EMPTY_RUN_QUEUE() ) {
621 IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
622 shutdownHaskellAndExit(0);
625 ASSERT( !EMPTY_RUN_QUEUE() );
629 /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
633 /* If there's a GC pending, don't do anything until it has
637 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
638 waitCondition( &gc_pending_cond, &sched_mutex );
642 #if defined(RTS_SUPPORTS_THREADS)
643 /* block until we've got a thread on the run queue and a free
647 if ( EMPTY_RUN_QUEUE() ) {
648 /* Give up our capability */
649 releaseCapability(cap);
650 IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
651 waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
652 IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
654 while ( EMPTY_RUN_QUEUE() ) {
655 waitForWorkCapability(&sched_mutex, &cap);
656 IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
663 if (RtsFlags.GranFlags.Light)
664 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
666 /* adjust time based on time-stamp */
667 if (event->time > CurrentTime[CurrentProc] &&
668 event->evttype != ContinueThread)
669 CurrentTime[CurrentProc] = event->time;
671 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
672 if (!RtsFlags.GranFlags.Light)
675 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
677 /* main event dispatcher in GranSim */
678 switch (event->evttype) {
679 /* Should just be continuing execution */
681 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
682 /* ToDo: check assertion
683 ASSERT(run_queue_hd != (StgTSO*)NULL &&
684 run_queue_hd != END_TSO_QUEUE);
686 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
687 if (!RtsFlags.GranFlags.DoAsyncFetch &&
688 procStatus[CurrentProc]==Fetching) {
689 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
690 CurrentTSO->id, CurrentTSO, CurrentProc);
693 /* Ignore ContinueThreads for completed threads */
694 if (CurrentTSO->what_next == ThreadComplete) {
695 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
696 CurrentTSO->id, CurrentTSO, CurrentProc);
699 /* Ignore ContinueThreads for threads that are being migrated */
700 if (PROCS(CurrentTSO)==Nowhere) {
701 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
702 CurrentTSO->id, CurrentTSO, CurrentProc);
705 /* The thread should be at the beginning of the run queue */
706 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
707 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
708 CurrentTSO->id, CurrentTSO, CurrentProc);
709 break; // run the thread anyway
712 new_event(proc, proc, CurrentTime[proc],
714 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
716 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
717 break; // now actually run the thread; DaH Qu'vam yImuHbej
720 do_the_fetchnode(event);
721 goto next_thread; /* handle next event in event queue */
724 do_the_globalblock(event);
725 goto next_thread; /* handle next event in event queue */
728 do_the_fetchreply(event);
729 goto next_thread; /* handle next event in event queue */
731 case UnblockThread: /* Move from the blocked queue to the tail of */
732 do_the_unblock(event);
733 goto next_thread; /* handle next event in event queue */
735 case ResumeThread: /* Move from the blocked queue to the tail of */
736 /* the runnable queue ( i.e. Qu' SImqa'lu') */
737 event->tso->gran.blocktime +=
738 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
739 do_the_startthread(event);
740 goto next_thread; /* handle next event in event queue */
743 do_the_startthread(event);
744 goto next_thread; /* handle next event in event queue */
747 do_the_movethread(event);
748 goto next_thread; /* handle next event in event queue */
751 do_the_movespark(event);
752 goto next_thread; /* handle next event in event queue */
755 do_the_findwork(event);
756 goto next_thread; /* handle next event in event queue */
759 barf("Illegal event type %u\n", event->evttype);
762 /* This point was scheduler_loop in the old RTS */
764 IF_DEBUG(gran, belch("GRAN: after main switch"));
766 TimeOfLastEvent = CurrentTime[CurrentProc];
767 TimeOfNextEvent = get_time_of_next_event();
768 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
769 // CurrentTSO = ThreadQueueHd;
771 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
774 if (RtsFlags.GranFlags.Light)
775 GranSimLight_leave_system(event, &ActiveTSO);
777 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
780 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
782 /* in a GranSim setup the TSO stays on the run queue */
784 /* Take a thread from the run queue. */
785 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
788 fprintf(stderr, "GRAN: About to run current thread, which is\n");
791 context_switch = 0; // turned on via GranYield, checking events and time slice
794 DumpGranEvent(GR_SCHEDULE, t));
796 procStatus[CurrentProc] = Busy;
799 if (PendingFetches != END_BF_QUEUE) {
803 /* ToDo: phps merge with spark activation above */
804 /* check whether we have local work and send requests if we have none */
805 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
806 /* :-[ no local threads => look out for local sparks */
807 /* the spark pool for the current PE */
808 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
809 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
810 pool->hd < pool->tl) {
812 * ToDo: add GC code check that we really have enough heap afterwards!!
814 * If we're here (no runnable threads) and we have pending
815 * sparks, we must have a space problem. Get enough space
816 * to turn one of those pending sparks into a
820 spark = findSpark(rtsFalse); /* get a spark */
821 if (spark != (rtsSpark) NULL) {
822 tso = activateSpark(spark); /* turn the spark into a thread */
823 IF_PAR_DEBUG(schedule,
824 belch("==== schedule: Created TSO %d (%p); %d threads active",
825 tso->id, tso, advisory_thread_count));
827 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
828 belch("==^^ failed to activate spark");
830 } /* otherwise fall through & pick-up new tso */
832 IF_PAR_DEBUG(verbose,
833 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
834 spark_queue_len(pool)));
839 /* If we still have no work we need to send a FISH to get a spark
842 if (EMPTY_RUN_QUEUE()) {
843 /* =8-[ no local sparks => look for work on other PEs */
845 * We really have absolutely no work. Send out a fish
846 * (there may be some out there already), and wait for
847 * something to arrive. We clearly can't run any threads
848 * until a SCHEDULE or RESUME arrives, and so that's what
849 * we're hoping to see. (Of course, we still have to
850 * respond to other types of messages.)
852 TIME now = msTime() /*CURRENT_TIME*/;
853 IF_PAR_DEBUG(verbose,
854 belch("-- now=%ld", now));
855 IF_PAR_DEBUG(verbose,
856 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
857 (last_fish_arrived_at!=0 &&
858 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
859 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
860 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
861 last_fish_arrived_at,
862 RtsFlags.ParFlags.fishDelay, now);
865 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
866 (last_fish_arrived_at==0 ||
867 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
868 /* outstandingFishes is set in sendFish, processFish;
869 avoid flooding system with fishes via delay */
871 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
874 // Global statistics: count no. of fishes
875 if (RtsFlags.ParFlags.ParStats.Global &&
876 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
877 globalParStats.tot_fish_mess++;
881 receivedFinish = processMessages();
884 } else if (PacketsWaiting()) { /* Look for incoming messages */
885 receivedFinish = processMessages();
888 /* Now we are sure that we have some work available */
889 ASSERT(run_queue_hd != END_TSO_QUEUE);
891 /* Take a thread from the run queue, if we have work */
892 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
893 IF_DEBUG(sanity,checkTSO(t));
895 /* ToDo: write something to the log-file
896 if (RTSflags.ParFlags.granSimStats && !sameThread)
897 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
901 /* the spark pool for the current PE */
902 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
905 belch("--=^ %d threads, %d sparks on [%#x]",
906 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
909 if (0 && RtsFlags.ParFlags.ParStats.Full &&
910 t && LastTSO && t->id != LastTSO->id &&
911 LastTSO->why_blocked == NotBlocked &&
912 LastTSO->what_next != ThreadComplete) {
913 // if previously scheduled TSO not blocked we have to record the context switch
914 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
915 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
918 if (RtsFlags.ParFlags.ParStats.Full &&
919 (emitSchedule /* forced emit */ ||
920 (t && LastTSO && t->id != LastTSO->id))) {
922 we are running a different TSO, so write a schedule event to log file
923 NB: If we use fair scheduling we also have to write a deschedule
924 event for LastTSO; with unfair scheduling we know that the
925 previous tso has blocked whenever we switch to another tso, so
926 we don't need it in GUM for now
928 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
929 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
930 emitSchedule = rtsFalse;
934 #else /* !GRAN && !PAR */
936 /* grab a thread from the run queue */
937 ASSERT(run_queue_hd != END_TSO_QUEUE);
939 // Sanity check the thread we're about to run. This can be
940 // expensive if there is lots of thread switching going on...
941 IF_DEBUG(sanity,checkTSO(t));
944 grabCapability(&cap);
945 cap->r.rCurrentTSO = t;
947 /* context switches are now initiated by the timer signal, unless
948 * the user specified "context switch as often as possible", with
953 RtsFlags.ProfFlags.profileInterval == 0 ||
955 (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
956 && (run_queue_hd != END_TSO_QUEUE
957 || blocked_queue_hd != END_TSO_QUEUE
958 || sleeping_queue != END_TSO_QUEUE)))
963 RELEASE_LOCK(&sched_mutex);
965 IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
966 t->id, t, whatNext_strs[t->what_next]));
969 startHeapProfTimer();
972 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
973 /* Run the current thread
975 switch (cap->r.rCurrentTSO->what_next) {
978 /* Thread already finished, return to scheduler. */
979 ret = ThreadFinished;
982 ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
985 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
987 case ThreadEnterInterp:
988 ret = interpretBCO(cap);
991 barf("schedule: invalid what_next field");
993 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
995 /* Costs for the scheduler are assigned to CCS_SYSTEM */
1001 ACQUIRE_LOCK(&sched_mutex);
1004 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1005 #elif !defined(GRAN) && !defined(PAR)
1006 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1008 t = cap->r.rCurrentTSO;
1011 /* HACK 675: if the last thread didn't yield, make sure to print a
1012 SCHEDULE event to the log file when StgRunning the next thread, even
1013 if it is the same one as before */
1015 TimeOfLastYield = CURRENT_TIME;
1021 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1022 globalGranStats.tot_heapover++;
1024 globalParStats.tot_heapover++;
1027 // did the task ask for a large block?
1028 if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1029 // if so, get one and push it on the front of the nursery.
1033 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1035 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)",
1037 whatNext_strs[t->what_next], blocks));
1039 // don't do this if it would push us over the
1040 // alloc_blocks_lim limit; we'll GC first.
1041 if (alloc_blocks + blocks < alloc_blocks_lim) {
1043 alloc_blocks += blocks;
1044 bd = allocGroup( blocks );
1046 // link the new group into the list
1047 bd->link = cap->r.rCurrentNursery;
1048 bd->u.back = cap->r.rCurrentNursery->u.back;
1049 if (cap->r.rCurrentNursery->u.back != NULL) {
1050 cap->r.rCurrentNursery->u.back->link = bd;
1052 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1053 g0s0->blocks == cap->r.rNursery);
1054 cap->r.rNursery = g0s0->blocks = bd;
1056 cap->r.rCurrentNursery->u.back = bd;
1058 // initialise it as a nursery block
1062 bd->free = bd->start;
1064 // don't forget to update the block count in g0s0.
1065 g0s0->n_blocks += blocks;
1066 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1068 // now update the nursery to point to the new block
1069 cap->r.rCurrentNursery = bd;
1071 // we might be unlucky and have another thread get on the
1072 // run queue before us and steal the large block, but in that
1073 // case the thread will just end up requesting another large
1075 PUSH_ON_RUN_QUEUE(t);
1080 /* make all the running tasks block on a condition variable,
1081 * maybe set context_switch and wait till they all pile in,
1082 * then have them wait on a GC condition variable.
1084 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
1085 t->id, t, whatNext_strs[t->what_next]));
1088 ASSERT(!is_on_queue(t,CurrentProc));
1090 /* Currently we emit a DESCHEDULE event before GC in GUM.
1091 ToDo: either add separate event to distinguish SYSTEM time from rest
1092 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1093 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1094 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1095 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1096 emitSchedule = rtsTrue;
1100 ready_to_gc = rtsTrue;
1101 context_switch = 1; /* stop other threads ASAP */
1102 PUSH_ON_RUN_QUEUE(t);
1103 /* actual GC is done at the end of the while loop */
1109 DumpGranEvent(GR_DESCHEDULE, t));
1110 globalGranStats.tot_stackover++;
1113 // DumpGranEvent(GR_DESCHEDULE, t);
1114 globalParStats.tot_stackover++;
1116 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
1117 t->id, t, whatNext_strs[t->what_next]));
1118 /* just adjust the stack for this thread, then pop it back
1124 /* enlarge the stack */
1125 StgTSO *new_t = threadStackOverflow(t);
1127 /* This TSO has moved, so update any pointers to it from the
1128 * main thread stack. It better not be on any other queues...
1129 * (it shouldn't be).
1131 for (m = main_threads; m != NULL; m = m->link) {
1136 threadPaused(new_t);
1137 PUSH_ON_RUN_QUEUE(new_t);
1141 case ThreadYielding:
1144 DumpGranEvent(GR_DESCHEDULE, t));
1145 globalGranStats.tot_yields++;
1148 // DumpGranEvent(GR_DESCHEDULE, t);
1149 globalParStats.tot_yields++;
1151 /* put the thread back on the run queue. Then, if we're ready to
1152 * GC, check whether this is the last task to stop. If so, wake
1153 * up the GC thread. getThread will block during a GC until the
1157 if (t->what_next == ThreadEnterInterp) {
1158 /* ToDo: or maybe a timer expired when we were in Hugs?
1159 * or maybe someone hit ctrl-C
1161 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
1162 t->id, t, whatNext_strs[t->what_next]);
1164 belch("--<< thread %ld (%p; %s) stopped, yielding",
1165 t->id, t, whatNext_strs[t->what_next]);
1172 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1174 ASSERT(t->link == END_TSO_QUEUE);
1176 ASSERT(!is_on_queue(t,CurrentProc));
1179 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1180 checkThreadQsSanity(rtsTrue));
1183 if (RtsFlags.ParFlags.doFairScheduling) {
1184 /* this does round-robin scheduling; good for concurrency */
1185 APPEND_TO_RUN_QUEUE(t);
1187 /* this does unfair scheduling; good for parallelism */
1188 PUSH_ON_RUN_QUEUE(t);
1191 /* this does round-robin scheduling; good for concurrency */
1192 APPEND_TO_RUN_QUEUE(t);
1195 /* add a ContinueThread event to actually process the thread */
1196 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1198 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1200 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1209 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1210 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1211 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1213 // ??? needed; should emit block before
1215 DumpGranEvent(GR_DESCHEDULE, t));
1216 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1219 ASSERT(procStatus[CurrentProc]==Busy ||
1220 ((procStatus[CurrentProc]==Fetching) &&
1221 (t->block_info.closure!=(StgClosure*)NULL)));
1222 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1223 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1224 procStatus[CurrentProc]==Fetching))
1225 procStatus[CurrentProc] = Idle;
1229 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1230 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1233 if (t->block_info.closure!=(StgClosure*)NULL)
1234 print_bq(t->block_info.closure));
1236 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1239 /* whatever we schedule next, we must log that schedule */
1240 emitSchedule = rtsTrue;
1243 /* don't need to do anything. Either the thread is blocked on
1244 * I/O, in which case we'll have called addToBlockedQueue
1245 * previously, or it's blocked on an MVar or Blackhole, in which
1246 * case it'll be on the relevant queue already.
1249 fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1250 printThreadBlockage(t);
1251 fprintf(stderr, "\n"));
1253 /* Only for dumping event to log file
1254 ToDo: do I need this in GranSim, too?
1261 case ThreadFinished:
1262 /* Need to check whether this was a main thread, and if so, signal
1263 * the task that started it with the return value. If we have no
1264 * more main threads, we probably need to stop all the tasks until
1267 /* We also end up here if the thread kills itself with an
1268 * uncaught exception, see Exception.hc.
1270 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1272 endThread(t, CurrentProc); // clean-up the thread
1274 /* For now all are advisory -- HWL */
1275 //if(t->priority==AdvisoryPriority) ??
1276 advisory_thread_count--;
1279 if(t->dist.priority==RevalPriority)
1283 if (RtsFlags.ParFlags.ParStats.Full &&
1284 !RtsFlags.ParFlags.ParStats.Suppressed)
1285 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1290 barf("schedule: invalid thread return code %d", (int)ret);
1293 #if defined(RTS_SUPPORTS_THREADS)
1294 /* I don't understand what this re-grab is doing -- sof */
1295 grabCapability(&cap);
1299 if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1300 GarbageCollect(GetRoots, rtsTrue);
1302 performHeapProfile = rtsFalse;
1303 ready_to_gc = rtsFalse; // we already GC'd
1309 && allFreeCapabilities()
1312 /* everybody back, start the GC.
1313 * Could do it in this thread, or signal a condition var
1314 * to do it in another thread. Either way, we need to
1315 * broadcast on gc_pending_cond afterward.
1317 #if defined(RTS_SUPPORTS_THREADS)
1318 IF_DEBUG(scheduler,sched_belch("doing GC"));
1320 GarbageCollect(GetRoots,rtsFalse);
1321 ready_to_gc = rtsFalse;
1323 broadcastCondition(&gc_pending_cond);
1326 /* add a ContinueThread event to continue execution of current thread */
1327 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1329 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1331 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1339 IF_GRAN_DEBUG(unused,
1340 print_eventq(EventHd));
1342 event = get_next_event();
1345 /* ToDo: wait for next message to arrive rather than busy wait */
1348 } /* end of while(1) */
1350 IF_PAR_DEBUG(verbose,
1351 belch("== Leaving schedule() after having received Finish"));
1354 /* ---------------------------------------------------------------------------
1355 * deleteAllThreads(): kill all the live threads.
1357 * This is used when we catch a user interrupt (^C), before performing
1358 * any necessary cleanups and running finalizers.
1359 * ------------------------------------------------------------------------- */
1361 void deleteAllThreads ( void )
1364 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1365 for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) {
1369 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) {
1373 for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) {
1377 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1378 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1379 sleeping_queue = END_TSO_QUEUE;
1382 /* startThread and insertThread are now in GranSim.c -- HWL */
1385 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1386 //@subsection Suspend and Resume
1388 /* ---------------------------------------------------------------------------
1389 * Suspending & resuming Haskell threads.
1391 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1392 * its capability before calling the C function. This allows another
1393 * task to pick up the capability and carry on running Haskell
1394 * threads. It also means that if the C call blocks, it won't lock
1397 * The Haskell thread making the C call is put to sleep for the
1398 * duration of the call, on the susepended_ccalling_threads queue. We
1399 * give out a token to the task, which it can use to resume the thread
1400 * on return from the C function.
1401 * ------------------------------------------------------------------------- */
1404 suspendThread( StgRegTable *reg,
1406 #if !defined(RTS_SUPPORTS_THREADS)
1414 /* assume that *reg is a pointer to the StgRegTable part
1417 cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1419 ACQUIRE_LOCK(&sched_mutex);
1422 sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1424 threadPaused(cap->r.rCurrentTSO);
1425 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1426 suspended_ccalling_threads = cap->r.rCurrentTSO;
1428 #if defined(RTS_SUPPORTS_THREADS)
1429 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1432 /* Use the thread ID as the token; it should be unique */
1433 tok = cap->r.rCurrentTSO->id;
1435 /* Hand back capability */
1436 releaseCapability(cap);
1438 #if defined(RTS_SUPPORTS_THREADS)
1439 /* Preparing to leave the RTS, so ensure there's a native thread/task
1440 waiting to take over.
1442 ToDo: optimise this and only create a new task if there's a need
1443 for one (i.e., if there's only one Concurrent Haskell thread alive,
1444 there's no need to create a new task).
1446 IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1448 startTask(taskStart);
1452 /* Other threads _might_ be available for execution; signal this */
1454 RELEASE_LOCK(&sched_mutex);
1459 resumeThread( StgInt tok,
1461 #if !defined(RTS_SUPPORTS_THREADS)
1466 StgTSO *tso, **prev;
1469 #if defined(RTS_SUPPORTS_THREADS)
1470 /* Wait for permission to re-enter the RTS with the result. */
1472 grabReturnCapability(&sched_mutex, &cap);
1474 grabCapability(&cap);
1477 grabCapability(&cap);
1480 /* Remove the thread off of the suspended list */
1481 prev = &suspended_ccalling_threads;
1482 for (tso = suspended_ccalling_threads;
1483 tso != END_TSO_QUEUE;
1484 prev = &tso->link, tso = tso->link) {
1485 if (tso->id == (StgThreadID)tok) {
1490 if (tso == END_TSO_QUEUE) {
1491 barf("resumeThread: thread not found");
1493 tso->link = END_TSO_QUEUE;
1494 /* Reset blocking status */
1495 tso->why_blocked = NotBlocked;
1497 RELEASE_LOCK(&sched_mutex);
1499 cap->r.rCurrentTSO = tso;
1504 /* ---------------------------------------------------------------------------
1506 * ------------------------------------------------------------------------ */
1507 static void unblockThread(StgTSO *tso);
1509 /* ---------------------------------------------------------------------------
1510 * Comparing Thread ids.
1512 * This is used from STG land in the implementation of the
1513 * instances of Eq/Ord for ThreadIds.
1514 * ------------------------------------------------------------------------ */
1516 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1518 StgThreadID id1 = tso1->id;
1519 StgThreadID id2 = tso2->id;
1521 if (id1 < id2) return (-1);
1522 if (id1 > id2) return 1;
1526 /* ---------------------------------------------------------------------------
1527 * Fetching the ThreadID from an StgTSO.
1529 * This is used in the implementation of Show for ThreadIds.
1530 * ------------------------------------------------------------------------ */
1531 int rts_getThreadId(const StgTSO *tso)
1536 /* ---------------------------------------------------------------------------
1537 Create a new thread.
1539 The new thread starts with the given stack size. Before the
1540 scheduler can run, however, this thread needs to have a closure
1541 (and possibly some arguments) pushed on its stack. See
1542 pushClosure() in Schedule.h.
1544 createGenThread() and createIOThread() (in SchedAPI.h) are
1545 convenient packaged versions of this function.
1547 currently pri (priority) is only used in a GRAN setup -- HWL
1548 ------------------------------------------------------------------------ */
1549 //@cindex createThread
1551 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1553 createThread(nat stack_size, StgInt pri)
1555 return createThread_(stack_size, rtsFalse, pri);
1559 createThread_(nat size, rtsBool have_lock, StgInt pri)
1563 createThread(nat stack_size)
1565 return createThread_(stack_size, rtsFalse);
1569 createThread_(nat size, rtsBool have_lock)
1576 /* First check whether we should create a thread at all */
1578 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1579 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1581 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1582 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1583 return END_TSO_QUEUE;
1589 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1592 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1594 /* catch ridiculously small stack sizes */
1595 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1596 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1599 stack_size = size - TSO_STRUCT_SIZEW;
1601 tso = (StgTSO *)allocate(size);
1602 TICK_ALLOC_TSO(stack_size, 0);
1604 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1606 SET_GRAN_HDR(tso, ThisPE);
1608 tso->what_next = ThreadEnterGHC;
1610 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1611 * protect the increment operation on next_thread_id.
1612 * In future, we could use an atomic increment instead.
1614 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1615 tso->id = next_thread_id++;
1616 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1618 tso->why_blocked = NotBlocked;
1619 tso->blocked_exceptions = NULL;
1621 tso->stack_size = stack_size;
1622 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1624 tso->sp = (P_)&(tso->stack) + stack_size;
1627 tso->prof.CCCS = CCS_MAIN;
1630 /* put a stop frame on the stack */
1631 tso->sp -= sizeofW(StgStopFrame);
1632 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1633 tso->su = (StgUpdateFrame*)tso->sp;
1637 tso->link = END_TSO_QUEUE;
1638 /* uses more flexible routine in GranSim */
1639 insertThread(tso, CurrentProc);
1641 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1647 if (RtsFlags.GranFlags.GranSimStats.Full)
1648 DumpGranEvent(GR_START,tso);
1650 if (RtsFlags.ParFlags.ParStats.Full)
1651 DumpGranEvent(GR_STARTQ,tso);
1652 /* HACk to avoid SCHEDULE
1656 /* Link the new thread on the global thread list.
1658 tso->global_link = all_threads;
1662 tso->dist.priority = MandatoryPriority; //by default that is...
1666 tso->gran.pri = pri;
1668 tso->gran.magic = TSO_MAGIC; // debugging only
1670 tso->gran.sparkname = 0;
1671 tso->gran.startedat = CURRENT_TIME;
1672 tso->gran.exported = 0;
1673 tso->gran.basicblocks = 0;
1674 tso->gran.allocs = 0;
1675 tso->gran.exectime = 0;
1676 tso->gran.fetchtime = 0;
1677 tso->gran.fetchcount = 0;
1678 tso->gran.blocktime = 0;
1679 tso->gran.blockcount = 0;
1680 tso->gran.blockedat = 0;
1681 tso->gran.globalsparks = 0;
1682 tso->gran.localsparks = 0;
1683 if (RtsFlags.GranFlags.Light)
1684 tso->gran.clock = Now; /* local clock */
1686 tso->gran.clock = 0;
1688 IF_DEBUG(gran,printTSO(tso));
1691 tso->par.magic = TSO_MAGIC; // debugging only
1693 tso->par.sparkname = 0;
1694 tso->par.startedat = CURRENT_TIME;
1695 tso->par.exported = 0;
1696 tso->par.basicblocks = 0;
1697 tso->par.allocs = 0;
1698 tso->par.exectime = 0;
1699 tso->par.fetchtime = 0;
1700 tso->par.fetchcount = 0;
1701 tso->par.blocktime = 0;
1702 tso->par.blockcount = 0;
1703 tso->par.blockedat = 0;
1704 tso->par.globalsparks = 0;
1705 tso->par.localsparks = 0;
1709 globalGranStats.tot_threads_created++;
1710 globalGranStats.threads_created_on_PE[CurrentProc]++;
1711 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1712 globalGranStats.tot_sq_probes++;
1714 // collect parallel global statistics (currently done together with GC stats)
1715 if (RtsFlags.ParFlags.ParStats.Global &&
1716 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1717 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
1718 globalParStats.tot_threads_created++;
1724 belch("==__ schedule: Created TSO %d (%p);",
1725 CurrentProc, tso, tso->id));
1727 IF_PAR_DEBUG(verbose,
1728 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1729 tso->id, tso, advisory_thread_count));
1731 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1732 tso->id, tso->stack_size));
1739 all parallel thread creation calls should fall through the following routine.
1742 createSparkThread(rtsSpark spark)
1744 ASSERT(spark != (rtsSpark)NULL);
1745 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1747 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1748 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1749 return END_TSO_QUEUE;
1753 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1754 if (tso==END_TSO_QUEUE)
1755 barf("createSparkThread: Cannot create TSO");
1757 tso->priority = AdvisoryPriority;
1759 pushClosure(tso,spark);
1760 PUSH_ON_RUN_QUEUE(tso);
1761 advisory_thread_count++;
1768 Turn a spark into a thread.
1769 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1772 //@cindex activateSpark
1774 activateSpark (rtsSpark spark)
1778 tso = createSparkThread(spark);
1779 if (RtsFlags.ParFlags.ParStats.Full) {
1780 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1781 IF_PAR_DEBUG(verbose,
1782 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1783 (StgClosure *)spark, info_type((StgClosure *)spark)));
1785 // ToDo: fwd info on local/global spark to thread -- HWL
1786 // tso->gran.exported = spark->exported;
1787 // tso->gran.locked = !spark->global;
1788 // tso->gran.sparkname = spark->name;
1794 /* ---------------------------------------------------------------------------
1797 * scheduleThread puts a thread on the head of the runnable queue.
1798 * This will usually be done immediately after a thread is created.
1799 * The caller of scheduleThread must create the thread using e.g.
1800 * createThread and push an appropriate closure
1801 * on this thread's stack before the scheduler is invoked.
1802 * ------------------------------------------------------------------------ */
1804 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1807 scheduleThread_(StgTSO *tso
1808 , rtsBool createTask
1809 #if !defined(THREADED_RTS)
1814 ACQUIRE_LOCK(&sched_mutex);
1816 /* Put the new thread on the head of the runnable queue. The caller
1817 * better push an appropriate closure on this thread's stack
1818 * beforehand. In the SMP case, the thread may start running as
1819 * soon as we release the scheduler lock below.
1821 PUSH_ON_RUN_QUEUE(tso);
1822 #if defined(THREADED_RTS)
1823 /* If main() is scheduling a thread, don't bother creating a
1827 startTask(taskStart);
1833 IF_DEBUG(scheduler,printTSO(tso));
1835 RELEASE_LOCK(&sched_mutex);
1838 void scheduleThread(StgTSO* tso)
1840 return scheduleThread_(tso, rtsFalse);
1843 void scheduleExtThread(StgTSO* tso)
1845 return scheduleThread_(tso, rtsTrue);
1848 /* ---------------------------------------------------------------------------
1851 * Initialise the scheduler. This resets all the queues - if the
1852 * queues contained any threads, they'll be garbage collected at the
1855 * ------------------------------------------------------------------------ */
1859 term_handler(int sig STG_UNUSED)
1862 ACQUIRE_LOCK(&term_mutex);
1864 RELEASE_LOCK(&term_mutex);
1875 for (i=0; i<=MAX_PROC; i++) {
1876 run_queue_hds[i] = END_TSO_QUEUE;
1877 run_queue_tls[i] = END_TSO_QUEUE;
1878 blocked_queue_hds[i] = END_TSO_QUEUE;
1879 blocked_queue_tls[i] = END_TSO_QUEUE;
1880 ccalling_threadss[i] = END_TSO_QUEUE;
1881 sleeping_queue = END_TSO_QUEUE;
1884 run_queue_hd = END_TSO_QUEUE;
1885 run_queue_tl = END_TSO_QUEUE;
1886 blocked_queue_hd = END_TSO_QUEUE;
1887 blocked_queue_tl = END_TSO_QUEUE;
1888 sleeping_queue = END_TSO_QUEUE;
1891 suspended_ccalling_threads = END_TSO_QUEUE;
1893 main_threads = NULL;
1894 all_threads = END_TSO_QUEUE;
1899 RtsFlags.ConcFlags.ctxtSwitchTicks =
1900 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1902 #if defined(RTS_SUPPORTS_THREADS)
1903 /* Initialise the mutex and condition variables used by
1905 initMutex(&sched_mutex);
1906 initMutex(&term_mutex);
1908 initCondition(&thread_ready_cond);
1912 initCondition(&gc_pending_cond);
1915 #if defined(RTS_SUPPORTS_THREADS)
1916 ACQUIRE_LOCK(&sched_mutex);
1919 /* Install the SIGHUP handler */
1922 struct sigaction action,oact;
1924 action.sa_handler = term_handler;
1925 sigemptyset(&action.sa_mask);
1926 action.sa_flags = 0;
1927 if (sigaction(SIGTERM, &action, &oact) != 0) {
1928 barf("can't install TERM handler");
1933 /* A capability holds the state a native thread needs in
1934 * order to execute STG code. At least one capability is
1935 * floating around (only SMP builds have more than one).
1939 #if defined(RTS_SUPPORTS_THREADS)
1940 /* start our haskell execution tasks */
1942 startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
1944 startTaskManager(0,taskStart);
1948 #if /* defined(SMP) ||*/ defined(PAR)
1952 #if defined(RTS_SUPPORTS_THREADS)
1953 RELEASE_LOCK(&sched_mutex);
1959 exitScheduler( void )
1961 #if defined(RTS_SUPPORTS_THREADS)
1966 /* -----------------------------------------------------------------------------
1967 Managing the per-task allocation areas.
1969 Each capability comes with an allocation area. These are
1970 fixed-length block lists into which allocation can be done.
1972 ToDo: no support for two-space collection at the moment???
1973 -------------------------------------------------------------------------- */
1975 /* -----------------------------------------------------------------------------
1976 * waitThread is the external interface for running a new computation
1977 * and waiting for the result.
1979 * In the non-SMP case, we create a new main thread, push it on the
1980 * main-thread stack, and invoke the scheduler to run it. The
1981 * scheduler will return when the top main thread on the stack has
1982 * completed or died, and fill in the necessary fields of the
1983 * main_thread structure.
1985 * In the SMP case, we create a main thread as before, but we then
1986 * create a new condition variable and sleep on it. When our new
1987 * main thread has completed, we'll be woken up and the status/result
1988 * will be in the main_thread struct.
1989 * -------------------------------------------------------------------------- */
1992 howManyThreadsAvail ( void )
1996 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1998 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2000 for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2006 finishAllThreads ( void )
2009 while (run_queue_hd != END_TSO_QUEUE) {
2010 waitThread ( run_queue_hd, NULL);
2012 while (blocked_queue_hd != END_TSO_QUEUE) {
2013 waitThread ( blocked_queue_hd, NULL);
2015 while (sleeping_queue != END_TSO_QUEUE) {
2016 waitThread ( blocked_queue_hd, NULL);
2019 (blocked_queue_hd != END_TSO_QUEUE ||
2020 run_queue_hd != END_TSO_QUEUE ||
2021 sleeping_queue != END_TSO_QUEUE);
2025 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2027 #if defined(THREADED_RTS)
2028 return waitThread_(tso,ret, rtsFalse);
2030 return waitThread_(tso,ret);
2035 waitThread_(StgTSO *tso,
2036 /*out*/StgClosure **ret
2037 #if defined(THREADED_RTS)
2038 , rtsBool blockWaiting
2043 SchedulerStatus stat;
2045 ACQUIRE_LOCK(&sched_mutex);
2047 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2052 #if defined(RTS_SUPPORTS_THREADS)
2053 initCondition(&m->wakeup);
2056 m->link = main_threads;
2059 IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2061 #if defined(RTS_SUPPORTS_THREADS)
2063 # if defined(THREADED_RTS)
2064 if (!blockWaiting) {
2065 /* In the threaded case, the OS thread that called main()
2066 * gets to enter the RTS directly without going via another
2069 RELEASE_LOCK(&sched_mutex);
2071 ASSERT(m->stat != NoStatus);
2075 IF_DEBUG(scheduler, sched_belch("sfoo"));
2077 waitCondition(&m->wakeup, &sched_mutex);
2078 } while (m->stat == NoStatus);
2081 /* GranSim specific init */
2082 CurrentTSO = m->tso; // the TSO to run
2083 procStatus[MainProc] = Busy; // status of main PE
2084 CurrentProc = MainProc; // PE to run it on
2088 RELEASE_LOCK(&sched_mutex);
2090 ASSERT(m->stat != NoStatus);
2095 #if defined(RTS_SUPPORTS_THREADS)
2096 closeCondition(&m->wakeup);
2099 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
2103 #if defined(THREADED_RTS)
2106 RELEASE_LOCK(&sched_mutex);
2111 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2112 //@subsection Run queue code
2116 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2117 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2118 implicit global variable that has to be correct when calling these
2122 /* Put the new thread on the head of the runnable queue.
2123 * The caller of createThread better push an appropriate closure
2124 * on this thread's stack before the scheduler is invoked.
2126 static /* inline */ void
2127 add_to_run_queue(tso)
2130 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2131 tso->link = run_queue_hd;
2133 if (run_queue_tl == END_TSO_QUEUE) {
2138 /* Put the new thread at the end of the runnable queue. */
2139 static /* inline */ void
2140 push_on_run_queue(tso)
2143 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2144 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2145 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2146 if (run_queue_hd == END_TSO_QUEUE) {
2149 run_queue_tl->link = tso;
2155 Should be inlined because it's used very often in schedule. The tso
2156 argument is actually only needed in GranSim, where we want to have the
2157 possibility to schedule *any* TSO on the run queue, irrespective of the
2158 actual ordering. Therefore, if tso is not the nil TSO then we traverse
2159 the run queue and dequeue the tso, adjusting the links in the queue.
2161 //@cindex take_off_run_queue
2162 static /* inline */ StgTSO*
2163 take_off_run_queue(StgTSO *tso) {
2167 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2169 if tso is specified, unlink that tso from the run_queue (doesn't have
2170 to be at the beginning of the queue); GranSim only
2172 if (tso!=END_TSO_QUEUE) {
2173 /* find tso in queue */
2174 for (t=run_queue_hd, prev=END_TSO_QUEUE;
2175 t!=END_TSO_QUEUE && t!=tso;
2179 /* now actually dequeue the tso */
2180 if (prev!=END_TSO_QUEUE) {
2181 ASSERT(run_queue_hd!=t);
2182 prev->link = t->link;
2184 /* t is at beginning of thread queue */
2185 ASSERT(run_queue_hd==t);
2186 run_queue_hd = t->link;
2188 /* t is at end of thread queue */
2189 if (t->link==END_TSO_QUEUE) {
2190 ASSERT(t==run_queue_tl);
2191 run_queue_tl = prev;
2193 ASSERT(run_queue_tl!=t);
2195 t->link = END_TSO_QUEUE;
2197 /* take tso from the beginning of the queue; std concurrent code */
2199 if (t != END_TSO_QUEUE) {
2200 run_queue_hd = t->link;
2201 t->link = END_TSO_QUEUE;
2202 if (run_queue_hd == END_TSO_QUEUE) {
2203 run_queue_tl = END_TSO_QUEUE;
2212 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2213 //@subsection Garbage Collextion Routines
2215 /* ---------------------------------------------------------------------------
2216 Where are the roots that we know about?
2218 - all the threads on the runnable queue
2219 - all the threads on the blocked queue
2220 - all the threads on the sleeping queue
2221 - all the thread currently executing a _ccall_GC
2222 - all the "main threads"
2224 ------------------------------------------------------------------------ */
2226 /* This has to be protected either by the scheduler monitor, or by the
2227 garbage collection monitor (probably the latter).
2232 GetRoots(evac_fn evac)
2239 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2240 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2241 evac((StgClosure **)&run_queue_hds[i]);
2242 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2243 evac((StgClosure **)&run_queue_tls[i]);
2245 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2246 evac((StgClosure **)&blocked_queue_hds[i]);
2247 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2248 evac((StgClosure **)&blocked_queue_tls[i]);
2249 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2250 evac((StgClosure **)&ccalling_threads[i]);
2257 if (run_queue_hd != END_TSO_QUEUE) {
2258 ASSERT(run_queue_tl != END_TSO_QUEUE);
2259 evac((StgClosure **)&run_queue_hd);
2260 evac((StgClosure **)&run_queue_tl);
2263 if (blocked_queue_hd != END_TSO_QUEUE) {
2264 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2265 evac((StgClosure **)&blocked_queue_hd);
2266 evac((StgClosure **)&blocked_queue_tl);
2269 if (sleeping_queue != END_TSO_QUEUE) {
2270 evac((StgClosure **)&sleeping_queue);
2274 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2275 evac((StgClosure **)&suspended_ccalling_threads);
2278 #if defined(PAR) || defined(GRAN)
2279 markSparkQueue(evac);
2283 /* -----------------------------------------------------------------------------
2286 This is the interface to the garbage collector from Haskell land.
2287 We provide this so that external C code can allocate and garbage
2288 collect when called from Haskell via _ccall_GC.
2290 It might be useful to provide an interface whereby the programmer
2291 can specify more roots (ToDo).
2293 This needs to be protected by the GC condition variable above. KH.
2294 -------------------------------------------------------------------------- */
2296 void (*extra_roots)(evac_fn);
2301 /* Obligated to hold this lock upon entry */
2302 ACQUIRE_LOCK(&sched_mutex);
2303 GarbageCollect(GetRoots,rtsFalse);
2304 RELEASE_LOCK(&sched_mutex);
2308 performMajorGC(void)
2310 ACQUIRE_LOCK(&sched_mutex);
2311 GarbageCollect(GetRoots,rtsTrue);
2312 RELEASE_LOCK(&sched_mutex);
2316 AllRoots(evac_fn evac)
2318 GetRoots(evac); // the scheduler's roots
2319 extra_roots(evac); // the user's roots
2323 performGCWithRoots(void (*get_roots)(evac_fn))
2325 ACQUIRE_LOCK(&sched_mutex);
2326 extra_roots = get_roots;
2327 GarbageCollect(AllRoots,rtsFalse);
2328 RELEASE_LOCK(&sched_mutex);
2331 /* -----------------------------------------------------------------------------
2334 If the thread has reached its maximum stack size, then raise the
2335 StackOverflow exception in the offending thread. Otherwise
2336 relocate the TSO into a larger chunk of memory and adjust its stack
2338 -------------------------------------------------------------------------- */
2341 threadStackOverflow(StgTSO *tso)
2343 nat new_stack_size, new_tso_size, diff, stack_words;
2347 IF_DEBUG(sanity,checkTSO(tso));
2348 if (tso->stack_size >= tso->max_stack_size) {
2351 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2352 tso->id, tso, tso->stack_size, tso->max_stack_size);
2353 /* If we're debugging, just print out the top of the stack */
2354 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2357 /* Send this thread the StackOverflow exception */
2358 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2362 /* Try to double the current stack size. If that takes us over the
2363 * maximum stack size for this thread, then use the maximum instead.
2364 * Finally round up so the TSO ends up as a whole number of blocks.
2366 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2367 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2368 TSO_STRUCT_SIZE)/sizeof(W_);
2369 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2370 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2372 IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2374 dest = (StgTSO *)allocate(new_tso_size);
2375 TICK_ALLOC_TSO(new_stack_size,0);
2377 /* copy the TSO block and the old stack into the new area */
2378 memcpy(dest,tso,TSO_STRUCT_SIZE);
2379 stack_words = tso->stack + tso->stack_size - tso->sp;
2380 new_sp = (P_)dest + new_tso_size - stack_words;
2381 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2383 /* relocate the stack pointers... */
2384 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2385 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2387 dest->stack_size = new_stack_size;
2389 /* and relocate the update frame list */
2390 relocate_stack(dest, diff);
2392 /* Mark the old TSO as relocated. We have to check for relocated
2393 * TSOs in the garbage collector and any primops that deal with TSOs.
2395 * It's important to set the sp and su values to just beyond the end
2396 * of the stack, so we don't attempt to scavenge any part of the
2399 tso->what_next = ThreadRelocated;
2401 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2402 tso->su = (StgUpdateFrame *)tso->sp;
2403 tso->why_blocked = NotBlocked;
2404 dest->mut_link = NULL;
2406 IF_PAR_DEBUG(verbose,
2407 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2408 tso->id, tso, tso->stack_size);
2409 /* If we're debugging, just print out the top of the stack */
2410 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2413 IF_DEBUG(sanity,checkTSO(tso));
2415 IF_DEBUG(scheduler,printTSO(dest));
2421 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2422 //@subsection Blocking Queue Routines
2424 /* ---------------------------------------------------------------------------
2425 Wake up a queue that was blocked on some resource.
2426 ------------------------------------------------------------------------ */
2430 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2435 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2437 /* write RESUME events to log file and
2438 update blocked and fetch time (depending on type of the orig closure) */
2439 if (RtsFlags.ParFlags.ParStats.Full) {
2440 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2441 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2442 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2443 if (EMPTY_RUN_QUEUE())
2444 emitSchedule = rtsTrue;
2446 switch (get_itbl(node)->type) {
2448 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2453 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2460 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2467 static StgBlockingQueueElement *
2468 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2471 PEs node_loc, tso_loc;
2473 node_loc = where_is(node); // should be lifted out of loop
2474 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2475 tso_loc = where_is((StgClosure *)tso);
2476 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2477 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2478 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2479 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2480 // insertThread(tso, node_loc);
2481 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2483 tso, node, (rtsSpark*)NULL);
2484 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2487 } else { // TSO is remote (actually should be FMBQ)
2488 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2489 RtsFlags.GranFlags.Costs.gunblocktime +
2490 RtsFlags.GranFlags.Costs.latency;
2491 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2493 tso, node, (rtsSpark*)NULL);
2494 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2497 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2499 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2500 (node_loc==tso_loc ? "Local" : "Global"),
2501 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2502 tso->block_info.closure = NULL;
2503 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2507 static StgBlockingQueueElement *
2508 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2510 StgBlockingQueueElement *next;
2512 switch (get_itbl(bqe)->type) {
2514 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2515 /* if it's a TSO just push it onto the run_queue */
2517 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2518 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2520 unblockCount(bqe, node);
2521 /* reset blocking status after dumping event */
2522 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2526 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2528 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2529 PendingFetches = (StgBlockedFetch *)bqe;
2533 /* can ignore this case in a non-debugging setup;
2534 see comments on RBHSave closures above */
2536 /* check that the closure is an RBHSave closure */
2537 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2538 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2539 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2543 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2544 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2548 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2552 #else /* !GRAN && !PAR */
2554 unblockOneLocked(StgTSO *tso)
2558 ASSERT(get_itbl(tso)->type == TSO);
2559 ASSERT(tso->why_blocked != NotBlocked);
2560 tso->why_blocked = NotBlocked;
2562 PUSH_ON_RUN_QUEUE(tso);
2564 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2569 #if defined(GRAN) || defined(PAR)
2570 inline StgBlockingQueueElement *
2571 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2573 ACQUIRE_LOCK(&sched_mutex);
2574 bqe = unblockOneLocked(bqe, node);
2575 RELEASE_LOCK(&sched_mutex);
2580 unblockOne(StgTSO *tso)
2582 ACQUIRE_LOCK(&sched_mutex);
2583 tso = unblockOneLocked(tso);
2584 RELEASE_LOCK(&sched_mutex);
2591 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2593 StgBlockingQueueElement *bqe;
2598 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2599 node, CurrentProc, CurrentTime[CurrentProc],
2600 CurrentTSO->id, CurrentTSO));
2602 node_loc = where_is(node);
2604 ASSERT(q == END_BQ_QUEUE ||
2605 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2606 get_itbl(q)->type == CONSTR); // closure (type constructor)
2607 ASSERT(is_unique(node));
2609 /* FAKE FETCH: magically copy the node to the tso's proc;
2610 no Fetch necessary because in reality the node should not have been
2611 moved to the other PE in the first place
2613 if (CurrentProc!=node_loc) {
2615 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2616 node, node_loc, CurrentProc, CurrentTSO->id,
2617 // CurrentTSO, where_is(CurrentTSO),
2618 node->header.gran.procs));
2619 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2621 belch("## new bitmask of node %p is %#x",
2622 node, node->header.gran.procs));
2623 if (RtsFlags.GranFlags.GranSimStats.Global) {
2624 globalGranStats.tot_fake_fetches++;
2629 // ToDo: check: ASSERT(CurrentProc==node_loc);
2630 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2633 bqe points to the current element in the queue
2634 next points to the next element in the queue
2636 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2637 //tso_loc = where_is(tso);
2639 bqe = unblockOneLocked(bqe, node);
2642 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2643 the closure to make room for the anchor of the BQ */
2644 if (bqe!=END_BQ_QUEUE) {
2645 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2647 ASSERT((info_ptr==&RBH_Save_0_info) ||
2648 (info_ptr==&RBH_Save_1_info) ||
2649 (info_ptr==&RBH_Save_2_info));
2651 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2652 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2653 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2656 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2657 node, info_type(node)));
2660 /* statistics gathering */
2661 if (RtsFlags.GranFlags.GranSimStats.Global) {
2662 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2663 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2664 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2665 globalGranStats.tot_awbq++; // total no. of bqs awakened
2668 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2669 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2673 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2675 StgBlockingQueueElement *bqe;
2677 ACQUIRE_LOCK(&sched_mutex);
2679 IF_PAR_DEBUG(verbose,
2680 belch("##-_ AwBQ for node %p on [%x]: ",
2684 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2685 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2690 ASSERT(q == END_BQ_QUEUE ||
2691 get_itbl(q)->type == TSO ||
2692 get_itbl(q)->type == BLOCKED_FETCH ||
2693 get_itbl(q)->type == CONSTR);
2696 while (get_itbl(bqe)->type==TSO ||
2697 get_itbl(bqe)->type==BLOCKED_FETCH) {
2698 bqe = unblockOneLocked(bqe, node);
2700 RELEASE_LOCK(&sched_mutex);
2703 #else /* !GRAN && !PAR */
2705 awakenBlockedQueue(StgTSO *tso)
2707 ACQUIRE_LOCK(&sched_mutex);
2708 while (tso != END_TSO_QUEUE) {
2709 tso = unblockOneLocked(tso);
2711 RELEASE_LOCK(&sched_mutex);
2715 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2716 //@subsection Exception Handling Routines
2718 /* ---------------------------------------------------------------------------
2720 - usually called inside a signal handler so it mustn't do anything fancy.
2721 ------------------------------------------------------------------------ */
2724 interruptStgRts(void)
2730 /* -----------------------------------------------------------------------------
2733 This is for use when we raise an exception in another thread, which
2735 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2736 -------------------------------------------------------------------------- */
2738 #if defined(GRAN) || defined(PAR)
2740 NB: only the type of the blocking queue is different in GranSim and GUM
2741 the operations on the queue-elements are the same
2742 long live polymorphism!
2745 unblockThread(StgTSO *tso)
2747 StgBlockingQueueElement *t, **last;
2749 ACQUIRE_LOCK(&sched_mutex);
2750 switch (tso->why_blocked) {
2753 return; /* not blocked */
2756 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2758 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2759 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2761 last = (StgBlockingQueueElement **)&mvar->head;
2762 for (t = (StgBlockingQueueElement *)mvar->head;
2764 last = &t->link, last_tso = t, t = t->link) {
2765 if (t == (StgBlockingQueueElement *)tso) {
2766 *last = (StgBlockingQueueElement *)tso->link;
2767 if (mvar->tail == tso) {
2768 mvar->tail = (StgTSO *)last_tso;
2773 barf("unblockThread (MVAR): TSO not found");
2776 case BlockedOnBlackHole:
2777 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2779 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2781 last = &bq->blocking_queue;
2782 for (t = bq->blocking_queue;
2784 last = &t->link, t = t->link) {
2785 if (t == (StgBlockingQueueElement *)tso) {
2786 *last = (StgBlockingQueueElement *)tso->link;
2790 barf("unblockThread (BLACKHOLE): TSO not found");
2793 case BlockedOnException:
2795 StgTSO *target = tso->block_info.tso;
2797 ASSERT(get_itbl(target)->type == TSO);
2799 if (target->what_next == ThreadRelocated) {
2800 target = target->link;
2801 ASSERT(get_itbl(target)->type == TSO);
2804 ASSERT(target->blocked_exceptions != NULL);
2806 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2807 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2809 last = &t->link, t = t->link) {
2810 ASSERT(get_itbl(t)->type == TSO);
2811 if (t == (StgBlockingQueueElement *)tso) {
2812 *last = (StgBlockingQueueElement *)tso->link;
2816 barf("unblockThread (Exception): TSO not found");
2820 case BlockedOnWrite:
2822 /* take TSO off blocked_queue */
2823 StgBlockingQueueElement *prev = NULL;
2824 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2825 prev = t, t = t->link) {
2826 if (t == (StgBlockingQueueElement *)tso) {
2828 blocked_queue_hd = (StgTSO *)t->link;
2829 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2830 blocked_queue_tl = END_TSO_QUEUE;
2833 prev->link = t->link;
2834 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2835 blocked_queue_tl = (StgTSO *)prev;
2841 barf("unblockThread (I/O): TSO not found");
2844 case BlockedOnDelay:
2846 /* take TSO off sleeping_queue */
2847 StgBlockingQueueElement *prev = NULL;
2848 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2849 prev = t, t = t->link) {
2850 if (t == (StgBlockingQueueElement *)tso) {
2852 sleeping_queue = (StgTSO *)t->link;
2854 prev->link = t->link;
2859 barf("unblockThread (I/O): TSO not found");
2863 barf("unblockThread");
2867 tso->link = END_TSO_QUEUE;
2868 tso->why_blocked = NotBlocked;
2869 tso->block_info.closure = NULL;
2870 PUSH_ON_RUN_QUEUE(tso);
2871 RELEASE_LOCK(&sched_mutex);
2875 unblockThread(StgTSO *tso)
2879 ACQUIRE_LOCK(&sched_mutex);
2880 switch (tso->why_blocked) {
2883 return; /* not blocked */
2886 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2888 StgTSO *last_tso = END_TSO_QUEUE;
2889 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2892 for (t = mvar->head; t != END_TSO_QUEUE;
2893 last = &t->link, last_tso = t, t = t->link) {
2896 if (mvar->tail == tso) {
2897 mvar->tail = last_tso;
2902 barf("unblockThread (MVAR): TSO not found");
2905 case BlockedOnBlackHole:
2906 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2908 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2910 last = &bq->blocking_queue;
2911 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2912 last = &t->link, t = t->link) {
2918 barf("unblockThread (BLACKHOLE): TSO not found");
2921 case BlockedOnException:
2923 StgTSO *target = tso->block_info.tso;
2925 ASSERT(get_itbl(target)->type == TSO);
2927 while (target->what_next == ThreadRelocated) {
2928 target = target->link;
2929 ASSERT(get_itbl(target)->type == TSO);
2932 ASSERT(target->blocked_exceptions != NULL);
2934 last = &target->blocked_exceptions;
2935 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2936 last = &t->link, t = t->link) {
2937 ASSERT(get_itbl(t)->type == TSO);
2943 barf("unblockThread (Exception): TSO not found");
2947 case BlockedOnWrite:
2949 StgTSO *prev = NULL;
2950 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2951 prev = t, t = t->link) {
2954 blocked_queue_hd = t->link;
2955 if (blocked_queue_tl == t) {
2956 blocked_queue_tl = END_TSO_QUEUE;
2959 prev->link = t->link;
2960 if (blocked_queue_tl == t) {
2961 blocked_queue_tl = prev;
2967 barf("unblockThread (I/O): TSO not found");
2970 case BlockedOnDelay:
2972 StgTSO *prev = NULL;
2973 for (t = sleeping_queue; t != END_TSO_QUEUE;
2974 prev = t, t = t->link) {
2977 sleeping_queue = t->link;
2979 prev->link = t->link;
2984 barf("unblockThread (I/O): TSO not found");
2988 barf("unblockThread");
2992 tso->link = END_TSO_QUEUE;
2993 tso->why_blocked = NotBlocked;
2994 tso->block_info.closure = NULL;
2995 PUSH_ON_RUN_QUEUE(tso);
2996 RELEASE_LOCK(&sched_mutex);
3000 /* -----------------------------------------------------------------------------
3003 * The following function implements the magic for raising an
3004 * asynchronous exception in an existing thread.
3006 * We first remove the thread from any queue on which it might be
3007 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3009 * We strip the stack down to the innermost CATCH_FRAME, building
3010 * thunks in the heap for all the active computations, so they can
3011 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3012 * an application of the handler to the exception, and push it on
3013 * the top of the stack.
3015 * How exactly do we save all the active computations? We create an
3016 * AP_UPD for every UpdateFrame on the stack. Entering one of these
3017 * AP_UPDs pushes everything from the corresponding update frame
3018 * upwards onto the stack. (Actually, it pushes everything up to the
3019 * next update frame plus a pointer to the next AP_UPD object.
3020 * Entering the next AP_UPD object pushes more onto the stack until we
3021 * reach the last AP_UPD object - at which point the stack should look
3022 * exactly as it did when we killed the TSO and we can continue
3023 * execution by entering the closure on top of the stack.
3025 * We can also kill a thread entirely - this happens if either (a) the
3026 * exception passed to raiseAsync is NULL, or (b) there's no
3027 * CATCH_FRAME on the stack. In either case, we strip the entire
3028 * stack and replace the thread with a zombie.
3030 * -------------------------------------------------------------------------- */
3033 deleteThread(StgTSO *tso)
3035 raiseAsync(tso,NULL);
3039 raiseAsync(StgTSO *tso, StgClosure *exception)
3041 StgUpdateFrame* su = tso->su;
3042 StgPtr sp = tso->sp;
3044 /* Thread already dead? */
3045 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3049 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3051 /* Remove it from any blocking queues */
3054 /* The stack freezing code assumes there's a closure pointer on
3055 * the top of the stack. This isn't always the case with compiled
3056 * code, so we have to push a dummy closure on the top which just
3057 * returns to the next return address on the stack.
3059 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3060 *(--sp) = (W_)&stg_dummy_ret_closure;
3064 nat words = ((P_)su - (P_)sp) - 1;
3068 /* If we find a CATCH_FRAME, and we've got an exception to raise,
3069 * then build the THUNK raise(exception), and leave it on
3070 * top of the CATCH_FRAME ready to enter.
3072 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3073 StgCatchFrame *cf = (StgCatchFrame *)su;
3076 /* we've got an exception to raise, so let's pass it to the
3077 * handler in this frame.
3079 raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3080 TICK_ALLOC_SE_THK(1,0);
3081 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3082 raise->payload[0] = exception;
3084 /* throw away the stack from Sp up to the CATCH_FRAME.
3088 /* Ensure that async excpetions are blocked now, so we don't get
3089 * a surprise exception before we get around to executing the
3092 if (tso->blocked_exceptions == NULL) {
3093 tso->blocked_exceptions = END_TSO_QUEUE;
3096 /* Put the newly-built THUNK on top of the stack, ready to execute
3097 * when the thread restarts.
3102 tso->what_next = ThreadEnterGHC;
3103 IF_DEBUG(sanity, checkTSO(tso));
3107 /* First build an AP_UPD consisting of the stack chunk above the
3108 * current update frame, with the top word on the stack as the
3111 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3116 ap->fun = (StgClosure *)sp[0];
3118 for(i=0; i < (nat)words; ++i) {
3119 ap->payload[i] = (StgClosure *)*sp++;
3122 switch (get_itbl(su)->type) {
3126 SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */);
3127 TICK_ALLOC_UP_THK(words+1,0);
3130 fprintf(stderr, "scheduler: Updating ");
3131 printPtr((P_)su->updatee);
3132 fprintf(stderr, " with ");
3133 printObj((StgClosure *)ap);
3136 /* Replace the updatee with an indirection - happily
3137 * this will also wake up any threads currently
3138 * waiting on the result.
3140 * Warning: if we're in a loop, more than one update frame on
3141 * the stack may point to the same object. Be careful not to
3142 * overwrite an IND_OLDGEN in this case, because we'll screw
3143 * up the mutable lists. To be on the safe side, don't
3144 * overwrite any kind of indirection at all. See also
3145 * threadSqueezeStack in GC.c, where we have to make a similar
3148 if (!closure_IND(su->updatee)) {
3149 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
3152 sp += sizeofW(StgUpdateFrame) -1;
3153 sp[0] = (W_)ap; /* push onto stack */
3159 StgCatchFrame *cf = (StgCatchFrame *)su;
3162 /* We want a PAP, not an AP_UPD. Fortunately, the
3163 * layout's the same.
3165 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3166 TICK_ALLOC_UPD_PAP(words+1,0);
3168 /* now build o = FUN(catch,ap,handler) */
3169 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3170 TICK_ALLOC_FUN(2,0);
3171 SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3172 o->payload[0] = (StgClosure *)ap;
3173 o->payload[1] = cf->handler;
3176 fprintf(stderr, "scheduler: Built ");
3177 printObj((StgClosure *)o);
3180 /* pop the old handler and put o on the stack */
3182 sp += sizeofW(StgCatchFrame) - 1;
3189 StgSeqFrame *sf = (StgSeqFrame *)su;
3192 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3193 TICK_ALLOC_UPD_PAP(words+1,0);
3195 /* now build o = FUN(seq,ap) */
3196 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3197 TICK_ALLOC_SE_THK(1,0);
3198 SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3199 o->payload[0] = (StgClosure *)ap;
3202 fprintf(stderr, "scheduler: Built ");
3203 printObj((StgClosure *)o);
3206 /* pop the old handler and put o on the stack */
3208 sp += sizeofW(StgSeqFrame) - 1;
3214 /* We've stripped the entire stack, the thread is now dead. */
3215 sp += sizeofW(StgStopFrame) - 1;
3216 sp[0] = (W_)exception; /* save the exception */
3217 tso->what_next = ThreadKilled;
3218 tso->su = (StgUpdateFrame *)(sp+1);
3229 /* -----------------------------------------------------------------------------
3230 resurrectThreads is called after garbage collection on the list of
3231 threads found to be garbage. Each of these threads will be woken
3232 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3233 on an MVar, or NonTermination if the thread was blocked on a Black
3235 -------------------------------------------------------------------------- */
3238 resurrectThreads( StgTSO *threads )
3242 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3243 next = tso->global_link;
3244 tso->global_link = all_threads;
3246 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3248 switch (tso->why_blocked) {
3250 case BlockedOnException:
3251 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3253 case BlockedOnBlackHole:
3254 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3257 /* This might happen if the thread was blocked on a black hole
3258 * belonging to a thread that we've just woken up (raiseAsync
3259 * can wake up threads, remember...).
3263 barf("resurrectThreads: thread blocked in a strange way");
3268 /* -----------------------------------------------------------------------------
3269 * Blackhole detection: if we reach a deadlock, test whether any
3270 * threads are blocked on themselves. Any threads which are found to
3271 * be self-blocked get sent a NonTermination exception.
3273 * This is only done in a deadlock situation in order to avoid
3274 * performance overhead in the normal case.
3275 * -------------------------------------------------------------------------- */
3278 detectBlackHoles( void )
3280 StgTSO *t = all_threads;
3281 StgUpdateFrame *frame;
3282 StgClosure *blocked_on;
3284 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3286 while (t->what_next == ThreadRelocated) {
3288 ASSERT(get_itbl(t)->type == TSO);
3291 if (t->why_blocked != BlockedOnBlackHole) {
3295 blocked_on = t->block_info.closure;
3297 for (frame = t->su; ; frame = frame->link) {
3298 switch (get_itbl(frame)->type) {
3301 if (frame->updatee == blocked_on) {
3302 /* We are blocking on one of our own computations, so
3303 * send this thread the NonTermination exception.
3306 sched_belch("thread %d is blocked on itself", t->id));
3307 raiseAsync(t, (StgClosure *)NonTermination_closure);
3328 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3329 //@subsection Debugging Routines
3331 /* -----------------------------------------------------------------------------
3332 Debugging: why is a thread blocked
3333 -------------------------------------------------------------------------- */
3338 printThreadBlockage(StgTSO *tso)
3340 switch (tso->why_blocked) {
3342 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3344 case BlockedOnWrite:
3345 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3347 case BlockedOnDelay:
3348 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3351 fprintf(stderr,"is blocked on an MVar");
3353 case BlockedOnException:
3354 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3355 tso->block_info.tso->id);
3357 case BlockedOnBlackHole:
3358 fprintf(stderr,"is blocked on a black hole");
3361 fprintf(stderr,"is not blocked");
3365 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3366 tso->block_info.closure, info_type(tso->block_info.closure));
3368 case BlockedOnGA_NoSend:
3369 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3370 tso->block_info.closure, info_type(tso->block_info.closure));
3373 #if defined(RTS_SUPPORTS_THREADS)
3374 case BlockedOnCCall:
3375 fprintf(stderr,"is blocked on an external call");
3379 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3380 tso->why_blocked, tso->id, tso);
3385 printThreadStatus(StgTSO *tso)
3387 switch (tso->what_next) {
3389 fprintf(stderr,"has been killed");
3391 case ThreadComplete:
3392 fprintf(stderr,"has completed");
3395 printThreadBlockage(tso);
3400 printAllThreads(void)
3405 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3406 ullong_format_string(TIME_ON_PROC(CurrentProc),
3407 time_string, rtsFalse/*no commas!*/);
3409 sched_belch("all threads at [%s]:", time_string);
3411 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3412 ullong_format_string(CURRENT_TIME,
3413 time_string, rtsFalse/*no commas!*/);
3415 sched_belch("all threads at [%s]:", time_string);
3417 sched_belch("all threads:");
3420 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3421 fprintf(stderr, "\tthread %d ", t->id);
3422 printThreadStatus(t);
3423 fprintf(stderr,"\n");
3428 Print a whole blocking queue attached to node (debugging only).
3433 print_bq (StgClosure *node)
3435 StgBlockingQueueElement *bqe;
3439 fprintf(stderr,"## BQ of closure %p (%s): ",
3440 node, info_type(node));
3442 /* should cover all closures that may have a blocking queue */
3443 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3444 get_itbl(node)->type == FETCH_ME_BQ ||
3445 get_itbl(node)->type == RBH ||
3446 get_itbl(node)->type == MVAR);
3448 ASSERT(node!=(StgClosure*)NULL); // sanity check
3450 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3454 Print a whole blocking queue starting with the element bqe.
3457 print_bqe (StgBlockingQueueElement *bqe)
3462 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3464 for (end = (bqe==END_BQ_QUEUE);
3465 !end; // iterate until bqe points to a CONSTR
3466 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3467 bqe = end ? END_BQ_QUEUE : bqe->link) {
3468 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3469 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3470 /* types of closures that may appear in a blocking queue */
3471 ASSERT(get_itbl(bqe)->type == TSO ||
3472 get_itbl(bqe)->type == BLOCKED_FETCH ||
3473 get_itbl(bqe)->type == CONSTR);
3474 /* only BQs of an RBH end with an RBH_Save closure */
3475 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3477 switch (get_itbl(bqe)->type) {
3479 fprintf(stderr," TSO %u (%x),",
3480 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3483 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3484 ((StgBlockedFetch *)bqe)->node,
3485 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3486 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3487 ((StgBlockedFetch *)bqe)->ga.weight);
3490 fprintf(stderr," %s (IP %p),",
3491 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3492 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3493 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3494 "RBH_Save_?"), get_itbl(bqe));
3497 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3498 info_type((StgClosure *)bqe)); // , node, info_type(node));
3502 fputc('\n', stderr);
3504 # elif defined(GRAN)
3506 print_bq (StgClosure *node)
3508 StgBlockingQueueElement *bqe;
3509 PEs node_loc, tso_loc;
3512 /* should cover all closures that may have a blocking queue */
3513 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3514 get_itbl(node)->type == FETCH_ME_BQ ||
3515 get_itbl(node)->type == RBH);
3517 ASSERT(node!=(StgClosure*)NULL); // sanity check
3518 node_loc = where_is(node);
3520 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3521 node, info_type(node), node_loc);
3524 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3526 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3527 !end; // iterate until bqe points to a CONSTR
3528 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3529 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3530 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3531 /* types of closures that may appear in a blocking queue */
3532 ASSERT(get_itbl(bqe)->type == TSO ||
3533 get_itbl(bqe)->type == CONSTR);
3534 /* only BQs of an RBH end with an RBH_Save closure */
3535 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3537 tso_loc = where_is((StgClosure *)bqe);
3538 switch (get_itbl(bqe)->type) {
3540 fprintf(stderr," TSO %d (%p) on [PE %d],",
3541 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3544 fprintf(stderr," %s (IP %p),",
3545 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3546 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3547 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3548 "RBH_Save_?"), get_itbl(bqe));
3551 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3552 info_type((StgClosure *)bqe), node, info_type(node));
3556 fputc('\n', stderr);
3560 Nice and easy: only TSOs on the blocking queue
3563 print_bq (StgClosure *node)
3567 ASSERT(node!=(StgClosure*)NULL); // sanity check
3568 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3569 tso != END_TSO_QUEUE;
3571 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3572 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3573 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3575 fputc('\n', stderr);
3586 for (i=0, tso=run_queue_hd;
3587 tso != END_TSO_QUEUE;
3596 sched_belch(char *s, ...)
3601 fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3603 fprintf(stderr, "== ");
3605 fprintf(stderr, "scheduler: ");
3607 vfprintf(stderr, s, ap);
3608 fprintf(stderr, "\n");
3614 //@node Index, , Debugging Routines, Main scheduling code
3618 //* StgMainThread:: @cindex\s-+StgMainThread
3619 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3620 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3621 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3622 //* context_switch:: @cindex\s-+context_switch
3623 //* createThread:: @cindex\s-+createThread
3624 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3625 //* initScheduler:: @cindex\s-+initScheduler
3626 //* interrupted:: @cindex\s-+interrupted
3627 //* next_thread_id:: @cindex\s-+next_thread_id
3628 //* print_bq:: @cindex\s-+print_bq
3629 //* run_queue_hd:: @cindex\s-+run_queue_hd
3630 //* run_queue_tl:: @cindex\s-+run_queue_tl
3631 //* sched_mutex:: @cindex\s-+sched_mutex
3632 //* schedule:: @cindex\s-+schedule
3633 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3634 //* term_mutex:: @cindex\s-+term_mutex