1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.174 2003/09/21 22:20:56 wolfgang Exp $
4 * (c) The GHC Team, 1998-2000
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distributed memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
18 * --------------------------------------------------------------------------*/
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
24 * Version with scheduler monitor support for SMPs (WAY=s):
26 This design provides a high-level API to create and schedule threads etc.
27 as documented in the SMP design document.
29 It uses a monitor design controlled by a single mutex to exercise control
30 over accesses to shared data structures, and builds on the Posix threads
33 The majority of state is shared. In order to keep essential per-task state,
34 there is a Capability structure, which contains all the information
35 needed to run a thread: its STG registers, a pointer to its TSO, a
36 nursery etc. During STG execution, a pointer to the capability is
37 kept in a register (BaseReg).
39 In a non-SMP build, there is one global capability, namely MainRegTable.
43 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
45 The main scheduling loop in GUM iterates until a finish message is received.
46 In that case a global flag @receivedFinish@ is set and this instance of
47 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48 for the handling of incoming messages, such as PP_FINISH.
49 Note that in the parallel case we have a system manager that coordinates
50 different PEs, each of which are running one instance of the RTS.
51 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
54 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
56 The main scheduling code in GranSim is quite different from that in std
57 (concurrent) Haskell: while concurrent Haskell just iterates over the
58 threads in the runnable queue, GranSim is event driven, i.e. it iterates
59 over the events in the global event queue. -- HWL
64 //* Variables and Data structures::
65 //* Main scheduling loop::
66 //* Suspend and Resume::
68 //* Garbage Collextion Routines::
69 //* Blocking Queue Routines::
70 //* Exception Handling Routines::
71 //* Debugging Routines::
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
78 #include "PosixSource.h"
85 #include "StgStartup.h"
87 #define COMPILING_SCHEDULER
89 #include "StgMiscClosures.h"
91 #include "Interpreter.h"
92 #include "Exception.h"
99 #include "ThreadLabels.h"
101 #include "Proftimer.h"
102 #include "ProfHeap.h"
104 #if defined(GRAN) || defined(PAR)
105 # include "GranSimRts.h"
106 # include "GranSim.h"
107 # include "ParallelRts.h"
108 # include "Parallel.h"
109 # include "ParallelDebug.h"
110 # include "FetchMe.h"
114 #include "Capability.h"
115 #include "OSThreads.h"
118 #ifdef HAVE_SYS_TYPES_H
119 #include <sys/types.h>
133 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
134 //@subsection Variables and Data structures
136 /* Main thread queue.
137 * Locks required: sched_mutex.
139 StgMainThread *main_threads = NULL;
142 * Locks required: sched_mutex.
146 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
147 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
150 In GranSim we have a runnable and a blocked queue for each processor.
151 In order to minimise code changes new arrays run_queue_hds/tls
152 are created. run_queue_hd is then a short cut (macro) for
153 run_queue_hds[CurrentProc] (see GranSim.h).
156 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
157 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
158 StgTSO *ccalling_threadss[MAX_PROC];
159 /* We use the same global list of threads (all_threads) in GranSim as in
160 the std RTS (i.e. we are cheating). However, we don't use this list in
161 the GranSim specific code at the moment (so we are only potentially
166 StgTSO *run_queue_hd = NULL;
167 StgTSO *run_queue_tl = NULL;
168 StgTSO *blocked_queue_hd = NULL;
169 StgTSO *blocked_queue_tl = NULL;
170 StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */
174 /* Linked list of all threads.
175 * Used for detecting garbage collected threads.
177 StgTSO *all_threads = NULL;
179 /* When a thread performs a safe C call (_ccall_GC, using old
180 * terminology), it gets put on the suspended_ccalling_threads
181 * list. Used by the garbage collector.
183 static StgTSO *suspended_ccalling_threads;
185 static StgTSO *threadStackOverflow(StgTSO *tso);
187 /* KH: The following two flags are shared memory locations. There is no need
188 to lock them, since they are only unset at the end of a scheduler
192 /* flag set by signal handler to precipitate a context switch */
193 //@cindex context_switch
194 nat context_switch = 0;
196 /* if this flag is set as well, give up execution */
197 //@cindex interrupted
198 rtsBool interrupted = rtsFalse;
200 /* Next thread ID to allocate.
201 * Locks required: thread_id_mutex
203 //@cindex next_thread_id
204 static StgThreadID next_thread_id = 1;
207 * Pointers to the state of the current thread.
208 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
209 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
212 /* The smallest stack size that makes any sense is:
213 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
214 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
215 * + 1 (the closure to enter)
217 * + 1 (spare slot req'd by stg_ap_v_ret)
219 * A thread with this stack will bomb immediately with a stack
220 * overflow, which will increase its stack size.
223 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
230 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
231 * exists - earlier gccs apparently didn't.
236 static rtsBool ready_to_gc;
239 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
240 * in an MT setting, needed to signal that a worker thread shouldn't hang around
241 * in the scheduler when it is out of work.
243 static rtsBool shutting_down_scheduler = rtsFalse;
245 void addToBlockedQueue ( StgTSO *tso );
247 static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
248 void interruptStgRts ( void );
250 static void detectBlackHoles ( void );
253 static void sched_belch(char *s, ...);
256 #if defined(RTS_SUPPORTS_THREADS)
257 /* ToDo: carefully document the invariants that go together
258 * with these synchronisation objects.
260 Mutex sched_mutex = INIT_MUTEX_VAR;
261 Mutex term_mutex = INIT_MUTEX_VAR;
264 * A heavyweight solution to the problem of protecting
265 * the thread_id from concurrent update.
267 Mutex thread_id_mutex = INIT_MUTEX_VAR;
271 static Condition gc_pending_cond = INIT_COND_VAR;
275 #endif /* RTS_SUPPORTS_THREADS */
279 rtsTime TimeOfLastYield;
280 rtsBool emitSchedule = rtsTrue;
284 static char *whatNext_strs[] = {
294 StgTSO * createSparkThread(rtsSpark spark);
295 StgTSO * activateSpark (rtsSpark spark);
299 * The thread state for the main thread.
300 // ToDo: check whether not needed any more
304 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
305 static void taskStart(void);
313 #if defined(RTS_SUPPORTS_THREADS)
315 startSchedulerTask(void)
317 startTask(taskStart);
321 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
322 //@subsection Main scheduling loop
324 /* ---------------------------------------------------------------------------
325 Main scheduling loop.
327 We use round-robin scheduling, each thread returning to the
328 scheduler loop when one of these conditions is detected:
331 * timer expires (thread yields)
336 Locking notes: we acquire the scheduler lock once at the beginning
337 of the scheduler loop, and release it when
339 * running a thread, or
340 * waiting for work, or
341 * waiting for a GC to complete.
344 In a GranSim setup this loop iterates over the global event queue.
345 This revolves around the global event queue, which determines what
346 to do next. Therefore, it's more complicated than either the
347 concurrent or the parallel (GUM) setup.
350 GUM iterates over incoming messages.
351 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
352 and sends out a fish whenever it has nothing to do; in-between
353 doing the actual reductions (shared code below) it processes the
354 incoming messages and deals with delayed operations
355 (see PendingFetches).
356 This is not the ugliest code you could imagine, but it's bloody close.
358 ------------------------------------------------------------------------ */
361 schedule( StgMainThread *mainThread, Capability *initialCapability )
364 Capability *cap = initialCapability;
365 StgThreadReturnCode ret;
373 rtsBool receivedFinish = rtsFalse;
375 nat tp_size, sp_size; // stats only
378 rtsBool was_interrupted = rtsFalse;
379 StgTSOWhatNext prev_what_next;
381 ACQUIRE_LOCK(&sched_mutex);
383 #if defined(RTS_SUPPORTS_THREADS)
384 /* in the threaded case, the capability is either passed in via the initialCapability
385 parameter, or initialized inside the scheduler loop */
388 fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n",
389 osThreadId(), osThreadId()));
391 fprintf(stderr,"### main thread: %p\n",mainThread));
393 fprintf(stderr,"### initial cap: %p\n",initialCapability));
395 /* simply initialise it in the non-threaded case */
396 grabCapability(&cap);
400 /* set up first event to get things going */
401 /* ToDo: assign costs for system setup and init MainTSO ! */
402 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
404 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
407 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
408 G_TSO(CurrentTSO, 5));
410 if (RtsFlags.GranFlags.Light) {
411 /* Save current time; GranSim Light only */
412 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
415 event = get_next_event();
417 while (event!=(rtsEvent*)NULL) {
418 /* Choose the processor with the next event */
419 CurrentProc = event->proc;
420 CurrentTSO = event->tso;
424 while (!receivedFinish) { /* set by processMessages */
425 /* when receiving PP_FINISH message */
432 IF_DEBUG(scheduler, printAllThreads());
434 #if defined(RTS_SUPPORTS_THREADS)
435 /* Check to see whether there are any worker threads
436 waiting to deposit external call results. If so,
437 yield our capability... if we have a capability, that is. */
439 yieldToReturningWorker(&sched_mutex, &cap,
440 mainThread ? &mainThread->bound_thread_cond : NULL);
442 /* If we do not currently hold a capability, we wait for one */
445 waitForWorkCapability(&sched_mutex, &cap,
446 mainThread ? &mainThread->bound_thread_cond : NULL);
447 IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap",
452 /* If we're interrupted (the user pressed ^C, or some other
453 * termination condition occurred), kill all the currently running
457 IF_DEBUG(scheduler, sched_belch("interrupted"));
458 interrupted = rtsFalse;
459 was_interrupted = rtsTrue;
460 #if defined(RTS_SUPPORTS_THREADS)
461 // In the threaded RTS, deadlock detection doesn't work,
462 // so just exit right away.
463 prog_belch("interrupted");
464 releaseCapability(cap);
465 startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
466 RELEASE_LOCK(&sched_mutex);
467 shutdownHaskellAndExit(EXIT_SUCCESS);
473 /* Go through the list of main threads and wake up any
474 * clients whose computations have finished. ToDo: this
475 * should be done more efficiently without a linear scan
476 * of the main threads list, somehow...
478 #if defined(RTS_SUPPORTS_THREADS)
480 StgMainThread *m, **prev;
481 prev = &main_threads;
482 for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
483 if (m->tso->what_next == ThreadComplete
484 || m->tso->what_next == ThreadKilled)
488 if(m->tso->what_next == ThreadComplete)
492 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
493 *(m->ret) = (StgClosure *)m->tso->sp[1];
505 m->stat = Interrupted;
515 removeThreadLabel((StgWord)m->tso);
517 releaseCapability(cap);
518 RELEASE_LOCK(&sched_mutex);
523 // The current OS thread can not handle the fact that the Haskell
524 // thread "m" has ended.
525 // "m" is bound; the scheduler loop in it's bound OS thread has
526 // to return, so let's pass our capability directly to that thread.
527 passCapability(&sched_mutex, cap, &m->bound_thread_cond);
534 if(!cap) // If we gave our capability away,
535 continue; // go to the top to get it back
537 #else /* not threaded */
540 /* in GUM do this only on the Main PE */
543 /* If our main thread has finished or been killed, return.
546 StgMainThread *m = main_threads;
547 if (m->tso->what_next == ThreadComplete
548 || m->tso->what_next == ThreadKilled) {
550 removeThreadLabel((StgWord)m->tso);
552 main_threads = main_threads->link;
553 if (m->tso->what_next == ThreadComplete) {
554 // We finished successfully, fill in the return value
555 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
556 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
560 if (m->ret) { *(m->ret) = NULL; };
561 if (was_interrupted) {
562 m->stat = Interrupted;
572 /* Top up the run queue from our spark pool. We try to make the
573 * number of threads in the run queue equal to the number of
576 * Disable spark support in SMP for now, non-essential & requires
577 * a little bit of work to make it compile cleanly. -- sof 1/02.
579 #if 0 /* defined(SMP) */
581 nat n = getFreeCapabilities();
582 StgTSO *tso = run_queue_hd;
584 /* Count the run queue */
585 while (n > 0 && tso != END_TSO_QUEUE) {
592 spark = findSpark(rtsFalse);
594 break; /* no more sparks in the pool */
596 /* I'd prefer this to be done in activateSpark -- HWL */
597 /* tricky - it needs to hold the scheduler lock and
598 * not try to re-acquire it -- SDM */
599 createSparkThread(spark);
601 sched_belch("==^^ turning spark of closure %p into a thread",
602 (StgClosure *)spark));
605 /* We need to wake up the other tasks if we just created some
608 if (getFreeCapabilities() - n > 1) {
609 signalCondition( &thread_ready_cond );
614 /* check for signals each time around the scheduler */
615 #if defined(RTS_USER_SIGNALS)
616 if (signals_pending()) {
617 RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
618 startSignalHandlers();
619 ACQUIRE_LOCK(&sched_mutex);
623 /* Check whether any waiting threads need to be woken up. If the
624 * run queue is empty, and there are no other tasks running, we
625 * can wait indefinitely for something to happen.
627 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
628 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
633 awaitEvent( EMPTY_RUN_QUEUE()
635 && allFreeCapabilities()
639 /* we can be interrupted while waiting for I/O... */
640 if (interrupted) continue;
643 * Detect deadlock: when we have no threads to run, there are no
644 * threads waiting on I/O or sleeping, and all the other tasks are
645 * waiting for work, we must have a deadlock of some description.
647 * We first try to find threads blocked on themselves (ie. black
648 * holes), and generate NonTermination exceptions where necessary.
650 * If no threads are black holed, we have a deadlock situation, so
651 * inform all the main threads.
653 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
654 if ( EMPTY_THREAD_QUEUES()
655 #if defined(RTS_SUPPORTS_THREADS)
656 && EMPTY_QUEUE(suspended_ccalling_threads)
659 && allFreeCapabilities()
663 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
664 #if defined(THREADED_RTS)
665 /* and SMP mode ..? */
666 releaseCapability(cap);
668 // Garbage collection can release some new threads due to
669 // either (a) finalizers or (b) threads resurrected because
670 // they are about to be send BlockedOnDeadMVar. Any threads
671 // thus released will be immediately runnable.
672 GarbageCollect(GetRoots,rtsTrue);
674 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
677 sched_belch("still deadlocked, checking for black holes..."));
680 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
682 #if defined(RTS_USER_SIGNALS)
683 /* If we have user-installed signal handlers, then wait
684 * for signals to arrive rather then bombing out with a
687 #if defined(RTS_SUPPORTS_THREADS)
688 if ( 0 ) { /* hmm..what to do? Simply stop waiting for
689 a signal with no runnable threads (or I/O
690 suspended ones) leads nowhere quick.
691 For now, simply shut down when we reach this
694 ToDo: define precisely under what conditions
695 the Scheduler should shut down in an MT setting.
698 if ( anyUserHandlers() ) {
701 sched_belch("still deadlocked, waiting for signals..."));
705 // we might be interrupted...
706 if (interrupted) { continue; }
708 if (signals_pending()) {
709 RELEASE_LOCK(&sched_mutex);
710 startSignalHandlers();
711 ACQUIRE_LOCK(&sched_mutex);
713 ASSERT(!EMPTY_RUN_QUEUE());
718 /* Probably a real deadlock. Send the current main thread the
719 * Deadlock exception (or in the SMP build, send *all* main
720 * threads the deadlock exception, since none of them can make
725 #if defined(RTS_SUPPORTS_THREADS)
726 for (m = main_threads; m != NULL; m = m->link) {
727 switch (m->tso->why_blocked) {
728 case BlockedOnBlackHole:
729 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
731 case BlockedOnException:
733 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
736 barf("deadlock: main thread blocked in a strange way");
741 switch (m->tso->why_blocked) {
742 case BlockedOnBlackHole:
743 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
745 case BlockedOnException:
747 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
750 barf("deadlock: main thread blocked in a strange way");
755 #if defined(RTS_SUPPORTS_THREADS)
756 /* ToDo: revisit conditions (and mechanism) for shutting
757 down a multi-threaded world */
758 IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
759 RELEASE_LOCK(&sched_mutex);
766 #elif defined(RTS_SUPPORTS_THREADS)
767 /* ToDo: add deadlock detection in threaded RTS */
769 /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
773 /* If there's a GC pending, don't do anything until it has
777 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
778 waitCondition( &gc_pending_cond, &sched_mutex );
782 #if defined(RTS_SUPPORTS_THREADS)
784 /* block until we've got a thread on the run queue and a free
788 if ( EMPTY_RUN_QUEUE() ) {
789 /* Give up our capability */
790 releaseCapability(cap);
792 /* If we're in the process of shutting down (& running the
793 * a batch of finalisers), don't wait around.
795 if ( shutting_down_scheduler ) {
796 RELEASE_LOCK(&sched_mutex);
799 IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
800 waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
801 IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
804 if ( EMPTY_RUN_QUEUE() ) {
805 continue; // nothing to do
811 if (RtsFlags.GranFlags.Light)
812 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
814 /* adjust time based on time-stamp */
815 if (event->time > CurrentTime[CurrentProc] &&
816 event->evttype != ContinueThread)
817 CurrentTime[CurrentProc] = event->time;
819 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
820 if (!RtsFlags.GranFlags.Light)
823 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
825 /* main event dispatcher in GranSim */
826 switch (event->evttype) {
827 /* Should just be continuing execution */
829 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
830 /* ToDo: check assertion
831 ASSERT(run_queue_hd != (StgTSO*)NULL &&
832 run_queue_hd != END_TSO_QUEUE);
834 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
835 if (!RtsFlags.GranFlags.DoAsyncFetch &&
836 procStatus[CurrentProc]==Fetching) {
837 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
838 CurrentTSO->id, CurrentTSO, CurrentProc);
841 /* Ignore ContinueThreads for completed threads */
842 if (CurrentTSO->what_next == ThreadComplete) {
843 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
844 CurrentTSO->id, CurrentTSO, CurrentProc);
847 /* Ignore ContinueThreads for threads that are being migrated */
848 if (PROCS(CurrentTSO)==Nowhere) {
849 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
850 CurrentTSO->id, CurrentTSO, CurrentProc);
853 /* The thread should be at the beginning of the run queue */
854 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
855 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
856 CurrentTSO->id, CurrentTSO, CurrentProc);
857 break; // run the thread anyway
860 new_event(proc, proc, CurrentTime[proc],
862 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
864 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
865 break; // now actually run the thread; DaH Qu'vam yImuHbej
868 do_the_fetchnode(event);
869 goto next_thread; /* handle next event in event queue */
872 do_the_globalblock(event);
873 goto next_thread; /* handle next event in event queue */
876 do_the_fetchreply(event);
877 goto next_thread; /* handle next event in event queue */
879 case UnblockThread: /* Move from the blocked queue to the tail of */
880 do_the_unblock(event);
881 goto next_thread; /* handle next event in event queue */
883 case ResumeThread: /* Move from the blocked queue to the tail of */
884 /* the runnable queue ( i.e. Qu' SImqa'lu') */
885 event->tso->gran.blocktime +=
886 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
887 do_the_startthread(event);
888 goto next_thread; /* handle next event in event queue */
891 do_the_startthread(event);
892 goto next_thread; /* handle next event in event queue */
895 do_the_movethread(event);
896 goto next_thread; /* handle next event in event queue */
899 do_the_movespark(event);
900 goto next_thread; /* handle next event in event queue */
903 do_the_findwork(event);
904 goto next_thread; /* handle next event in event queue */
907 barf("Illegal event type %u\n", event->evttype);
910 /* This point was scheduler_loop in the old RTS */
912 IF_DEBUG(gran, belch("GRAN: after main switch"));
914 TimeOfLastEvent = CurrentTime[CurrentProc];
915 TimeOfNextEvent = get_time_of_next_event();
916 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
917 // CurrentTSO = ThreadQueueHd;
919 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
922 if (RtsFlags.GranFlags.Light)
923 GranSimLight_leave_system(event, &ActiveTSO);
925 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
928 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
930 /* in a GranSim setup the TSO stays on the run queue */
932 /* Take a thread from the run queue. */
933 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
936 fprintf(stderr, "GRAN: About to run current thread, which is\n");
939 context_switch = 0; // turned on via GranYield, checking events and time slice
942 DumpGranEvent(GR_SCHEDULE, t));
944 procStatus[CurrentProc] = Busy;
947 if (PendingFetches != END_BF_QUEUE) {
951 /* ToDo: phps merge with spark activation above */
952 /* check whether we have local work and send requests if we have none */
953 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
954 /* :-[ no local threads => look out for local sparks */
955 /* the spark pool for the current PE */
956 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
957 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
958 pool->hd < pool->tl) {
960 * ToDo: add GC code check that we really have enough heap afterwards!!
962 * If we're here (no runnable threads) and we have pending
963 * sparks, we must have a space problem. Get enough space
964 * to turn one of those pending sparks into a
968 spark = findSpark(rtsFalse); /* get a spark */
969 if (spark != (rtsSpark) NULL) {
970 tso = activateSpark(spark); /* turn the spark into a thread */
971 IF_PAR_DEBUG(schedule,
972 belch("==== schedule: Created TSO %d (%p); %d threads active",
973 tso->id, tso, advisory_thread_count));
975 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
976 belch("==^^ failed to activate spark");
978 } /* otherwise fall through & pick-up new tso */
980 IF_PAR_DEBUG(verbose,
981 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
982 spark_queue_len(pool)));
987 /* If we still have no work we need to send a FISH to get a spark
990 if (EMPTY_RUN_QUEUE()) {
991 /* =8-[ no local sparks => look for work on other PEs */
993 * We really have absolutely no work. Send out a fish
994 * (there may be some out there already), and wait for
995 * something to arrive. We clearly can't run any threads
996 * until a SCHEDULE or RESUME arrives, and so that's what
997 * we're hoping to see. (Of course, we still have to
998 * respond to other types of messages.)
1000 TIME now = msTime() /*CURRENT_TIME*/;
1001 IF_PAR_DEBUG(verbose,
1002 belch("-- now=%ld", now));
1003 IF_PAR_DEBUG(verbose,
1004 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1005 (last_fish_arrived_at!=0 &&
1006 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
1007 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
1008 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
1009 last_fish_arrived_at,
1010 RtsFlags.ParFlags.fishDelay, now);
1013 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1014 (last_fish_arrived_at==0 ||
1015 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
1016 /* outstandingFishes is set in sendFish, processFish;
1017 avoid flooding system with fishes via delay */
1019 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
1022 // Global statistics: count no. of fishes
1023 if (RtsFlags.ParFlags.ParStats.Global &&
1024 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1025 globalParStats.tot_fish_mess++;
1029 receivedFinish = processMessages();
1032 } else if (PacketsWaiting()) { /* Look for incoming messages */
1033 receivedFinish = processMessages();
1036 /* Now we are sure that we have some work available */
1037 ASSERT(run_queue_hd != END_TSO_QUEUE);
1039 /* Take a thread from the run queue, if we have work */
1040 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
1041 IF_DEBUG(sanity,checkTSO(t));
1043 /* ToDo: write something to the log-file
1044 if (RTSflags.ParFlags.granSimStats && !sameThread)
1045 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1049 /* the spark pool for the current PE */
1050 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1053 belch("--=^ %d threads, %d sparks on [%#x]",
1054 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1057 if (0 && RtsFlags.ParFlags.ParStats.Full &&
1058 t && LastTSO && t->id != LastTSO->id &&
1059 LastTSO->why_blocked == NotBlocked &&
1060 LastTSO->what_next != ThreadComplete) {
1061 // if previously scheduled TSO not blocked we have to record the context switch
1062 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1063 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1066 if (RtsFlags.ParFlags.ParStats.Full &&
1067 (emitSchedule /* forced emit */ ||
1068 (t && LastTSO && t->id != LastTSO->id))) {
1070 we are running a different TSO, so write a schedule event to log file
1071 NB: If we use fair scheduling we also have to write a deschedule
1072 event for LastTSO; with unfair scheduling we know that the
1073 previous tso has blocked whenever we switch to another tso, so
1074 we don't need it in GUM for now
1076 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1077 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1078 emitSchedule = rtsFalse;
1082 #else /* !GRAN && !PAR */
1084 /* grab a thread from the run queue */
1085 ASSERT(run_queue_hd != END_TSO_QUEUE);
1086 t = POP_RUN_QUEUE();
1087 // Sanity check the thread we're about to run. This can be
1088 // expensive if there is lots of thread switching going on...
1089 IF_DEBUG(sanity,checkTSO(t));
1095 for(m = main_threads; m; m = m->link)
1106 fprintf(stderr,"### Running TSO %p in bound OS thread %u\n",
1108 // yes, the Haskell thread is bound to the current native thread
1113 fprintf(stderr,"### TSO %p bound to other OS thread than %u\n",
1115 // no, bound to a different Haskell thread: pass to that thread
1116 PUSH_ON_RUN_QUEUE(t);
1117 passCapability(&sched_mutex,cap,&m->bound_thread_cond);
1124 // The thread we want to run is not bound.
1125 if(mainThread == NULL)
1128 fprintf(stderr,"### Running TSO %p in worker OS thread %u\n",
1130 // if we are a worker thread,
1131 // we may run it here
1136 fprintf(stderr,"### TSO %p is not appropriate for main thread %p in OS thread %u\n",
1137 t, mainThread, osThreadId()));
1138 // no, the current native thread is bound to a different
1139 // Haskell thread, so pass it to any worker thread
1140 PUSH_ON_RUN_QUEUE(t);
1141 releaseCapability(cap);
1149 cap->r.rCurrentTSO = t;
1151 /* context switches are now initiated by the timer signal, unless
1152 * the user specified "context switch as often as possible", with
1155 if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1156 && (run_queue_hd != END_TSO_QUEUE
1157 || blocked_queue_hd != END_TSO_QUEUE
1158 || sleeping_queue != END_TSO_QUEUE)))
1165 RELEASE_LOCK(&sched_mutex);
1167 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
1168 t->id, whatNext_strs[t->what_next]));
1171 startHeapProfTimer();
1174 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1175 /* Run the current thread
1177 prev_what_next = t->what_next;
1178 switch (prev_what_next) {
1180 case ThreadComplete:
1181 /* Thread already finished, return to scheduler. */
1182 ret = ThreadFinished;
1185 errno = t->saved_errno;
1186 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1187 t->saved_errno = errno;
1189 case ThreadInterpret:
1190 ret = interpretBCO(cap);
1193 barf("schedule: invalid what_next field");
1195 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1197 /* Costs for the scheduler are assigned to CCS_SYSTEM */
1199 stopHeapProfTimer();
1203 ACQUIRE_LOCK(&sched_mutex);
1205 #ifdef RTS_SUPPORTS_THREADS
1206 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %p): ", osThreadId()););
1207 #elif !defined(GRAN) && !defined(PAR)
1208 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1210 t = cap->r.rCurrentTSO;
1213 /* HACK 675: if the last thread didn't yield, make sure to print a
1214 SCHEDULE event to the log file when StgRunning the next thread, even
1215 if it is the same one as before */
1217 TimeOfLastYield = CURRENT_TIME;
1223 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1224 globalGranStats.tot_heapover++;
1226 globalParStats.tot_heapover++;
1229 // did the task ask for a large block?
1230 if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1231 // if so, get one and push it on the front of the nursery.
1235 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1237 IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)",
1238 t->id, whatNext_strs[t->what_next], blocks));
1240 // don't do this if it would push us over the
1241 // alloc_blocks_lim limit; we'll GC first.
1242 if (alloc_blocks + blocks < alloc_blocks_lim) {
1244 alloc_blocks += blocks;
1245 bd = allocGroup( blocks );
1247 // link the new group into the list
1248 bd->link = cap->r.rCurrentNursery;
1249 bd->u.back = cap->r.rCurrentNursery->u.back;
1250 if (cap->r.rCurrentNursery->u.back != NULL) {
1251 cap->r.rCurrentNursery->u.back->link = bd;
1253 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1254 g0s0->blocks == cap->r.rNursery);
1255 cap->r.rNursery = g0s0->blocks = bd;
1257 cap->r.rCurrentNursery->u.back = bd;
1259 // initialise it as a nursery block. We initialise the
1260 // step, gen_no, and flags field of *every* sub-block in
1261 // this large block, because this is easier than making
1262 // sure that we always find the block head of a large
1263 // block whenever we call Bdescr() (eg. evacuate() and
1264 // isAlive() in the GC would both have to do this, at
1268 for (x = bd; x < bd + blocks; x++) {
1275 // don't forget to update the block count in g0s0.
1276 g0s0->n_blocks += blocks;
1277 // This assert can be a killer if the app is doing lots
1278 // of large block allocations.
1279 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1281 // now update the nursery to point to the new block
1282 cap->r.rCurrentNursery = bd;
1284 // we might be unlucky and have another thread get on the
1285 // run queue before us and steal the large block, but in that
1286 // case the thread will just end up requesting another large
1288 PUSH_ON_RUN_QUEUE(t);
1293 /* make all the running tasks block on a condition variable,
1294 * maybe set context_switch and wait till they all pile in,
1295 * then have them wait on a GC condition variable.
1297 IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow",
1298 t->id, whatNext_strs[t->what_next]));
1301 ASSERT(!is_on_queue(t,CurrentProc));
1303 /* Currently we emit a DESCHEDULE event before GC in GUM.
1304 ToDo: either add separate event to distinguish SYSTEM time from rest
1305 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1306 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1307 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1308 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1309 emitSchedule = rtsTrue;
1313 ready_to_gc = rtsTrue;
1314 context_switch = 1; /* stop other threads ASAP */
1315 PUSH_ON_RUN_QUEUE(t);
1316 /* actual GC is done at the end of the while loop */
1322 DumpGranEvent(GR_DESCHEDULE, t));
1323 globalGranStats.tot_stackover++;
1326 // DumpGranEvent(GR_DESCHEDULE, t);
1327 globalParStats.tot_stackover++;
1329 IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow",
1330 t->id, whatNext_strs[t->what_next]));
1331 /* just adjust the stack for this thread, then pop it back
1337 /* enlarge the stack */
1338 StgTSO *new_t = threadStackOverflow(t);
1340 /* This TSO has moved, so update any pointers to it from the
1341 * main thread stack. It better not be on any other queues...
1342 * (it shouldn't be).
1344 for (m = main_threads; m != NULL; m = m->link) {
1349 threadPaused(new_t);
1350 PUSH_ON_RUN_QUEUE(new_t);
1354 case ThreadYielding:
1357 DumpGranEvent(GR_DESCHEDULE, t));
1358 globalGranStats.tot_yields++;
1361 // DumpGranEvent(GR_DESCHEDULE, t);
1362 globalParStats.tot_yields++;
1364 /* put the thread back on the run queue. Then, if we're ready to
1365 * GC, check whether this is the last task to stop. If so, wake
1366 * up the GC thread. getThread will block during a GC until the
1370 if (t->what_next != prev_what_next) {
1371 belch("--<< thread %ld (%s) stopped to switch evaluators",
1372 t->id, whatNext_strs[t->what_next]);
1374 belch("--<< thread %ld (%s) stopped, yielding",
1375 t->id, whatNext_strs[t->what_next]);
1380 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1382 ASSERT(t->link == END_TSO_QUEUE);
1384 // Shortcut if we're just switching evaluators: don't bother
1385 // doing stack squeezing (which can be expensive), just run the
1387 if (t->what_next != prev_what_next) {
1394 ASSERT(!is_on_queue(t,CurrentProc));
1397 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1398 checkThreadQsSanity(rtsTrue));
1402 if (RtsFlags.ParFlags.doFairScheduling) {
1403 /* this does round-robin scheduling; good for concurrency */
1404 APPEND_TO_RUN_QUEUE(t);
1406 /* this does unfair scheduling; good for parallelism */
1407 PUSH_ON_RUN_QUEUE(t);
1410 // this does round-robin scheduling; good for concurrency
1411 APPEND_TO_RUN_QUEUE(t);
1415 /* add a ContinueThread event to actually process the thread */
1416 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1418 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1420 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1429 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1430 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1431 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1433 // ??? needed; should emit block before
1435 DumpGranEvent(GR_DESCHEDULE, t));
1436 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1439 ASSERT(procStatus[CurrentProc]==Busy ||
1440 ((procStatus[CurrentProc]==Fetching) &&
1441 (t->block_info.closure!=(StgClosure*)NULL)));
1442 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1443 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1444 procStatus[CurrentProc]==Fetching))
1445 procStatus[CurrentProc] = Idle;
1449 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1450 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1453 if (t->block_info.closure!=(StgClosure*)NULL)
1454 print_bq(t->block_info.closure));
1456 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1459 /* whatever we schedule next, we must log that schedule */
1460 emitSchedule = rtsTrue;
1463 /* don't need to do anything. Either the thread is blocked on
1464 * I/O, in which case we'll have called addToBlockedQueue
1465 * previously, or it's blocked on an MVar or Blackhole, in which
1466 * case it'll be on the relevant queue already.
1469 fprintf(stderr, "--<< thread %d (%s) stopped: ",
1470 t->id, whatNext_strs[t->what_next]);
1471 printThreadBlockage(t);
1472 fprintf(stderr, "\n"));
1474 /* Only for dumping event to log file
1475 ToDo: do I need this in GranSim, too?
1482 case ThreadFinished:
1483 /* Need to check whether this was a main thread, and if so, signal
1484 * the task that started it with the return value. If we have no
1485 * more main threads, we probably need to stop all the tasks until
1488 /* We also end up here if the thread kills itself with an
1489 * uncaught exception, see Exception.hc.
1491 IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished",
1492 t->id, whatNext_strs[t->what_next]));
1494 endThread(t, CurrentProc); // clean-up the thread
1496 /* For now all are advisory -- HWL */
1497 //if(t->priority==AdvisoryPriority) ??
1498 advisory_thread_count--;
1501 if(t->dist.priority==RevalPriority)
1505 if (RtsFlags.ParFlags.ParStats.Full &&
1506 !RtsFlags.ParFlags.ParStats.Suppressed)
1507 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1512 barf("schedule: invalid thread return code %d", (int)ret);
1516 // When we have +RTS -i0 and we're heap profiling, do a census at
1517 // every GC. This lets us get repeatable runs for debugging.
1518 if (performHeapProfile ||
1519 (RtsFlags.ProfFlags.profileInterval==0 &&
1520 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1521 GarbageCollect(GetRoots, rtsTrue);
1523 performHeapProfile = rtsFalse;
1524 ready_to_gc = rtsFalse; // we already GC'd
1530 && allFreeCapabilities()
1533 /* everybody back, start the GC.
1534 * Could do it in this thread, or signal a condition var
1535 * to do it in another thread. Either way, we need to
1536 * broadcast on gc_pending_cond afterward.
1538 #if defined(RTS_SUPPORTS_THREADS)
1539 IF_DEBUG(scheduler,sched_belch("doing GC"));
1541 GarbageCollect(GetRoots,rtsFalse);
1542 ready_to_gc = rtsFalse;
1544 broadcastCondition(&gc_pending_cond);
1547 /* add a ContinueThread event to continue execution of current thread */
1548 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1550 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1552 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1560 IF_GRAN_DEBUG(unused,
1561 print_eventq(EventHd));
1563 event = get_next_event();
1566 /* ToDo: wait for next message to arrive rather than busy wait */
1569 } /* end of while(1) */
1571 IF_PAR_DEBUG(verbose,
1572 belch("== Leaving schedule() after having received Finish"));
1575 /* ---------------------------------------------------------------------------
1576 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1577 * used by Control.Concurrent for error checking.
1578 * ------------------------------------------------------------------------- */
1581 rtsSupportsBoundThreads(void)
1590 /* ---------------------------------------------------------------------------
1591 * isThreadBound(tso): check whether tso is bound to an OS thread.
1592 * ------------------------------------------------------------------------- */
1595 isThreadBound(StgTSO* tso)
1599 for(m = main_threads; m; m = m->link)
1608 /* ---------------------------------------------------------------------------
1609 * Singleton fork(). Do not copy any running threads.
1610 * ------------------------------------------------------------------------- */
1613 deleteThreadImmediately(StgTSO *tso);
1616 forkProcess(StgTSO* tso)
1618 #ifndef mingw32_TARGET_OS
1622 IF_DEBUG(scheduler,sched_belch("forking!"));
1623 ACQUIRE_LOCK(&sched_mutex);
1626 if (pid) { /* parent */
1628 /* just return the pid */
1630 } else { /* child */
1632 /* wipe all other threads */
1633 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1634 tso->link = END_TSO_QUEUE;
1636 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1639 /* Don't kill the current thread.. */
1640 if (t->id == tso->id) {
1644 if (isThreadBound(t)) {
1645 // If the thread is bound, the OS thread that the thread is bound to
1646 // no longer exists after the fork() system call.
1647 // The bound Haskell thread is therefore unable to run at all;
1648 // we must not give it a chance to survive by catching the
1649 // ThreadKilled exception. So we kill it "brutally" rather than
1650 // using deleteThread.
1651 deleteThreadImmediately(t);
1657 if (isThreadBound(tso)) {
1659 // If the current is not bound, then we should make it so.
1660 // The OS thread left over by fork() is special in that the process
1661 // will terminate as soon as the thread terminates;
1662 // we'd expect forkProcess to behave similarily.
1663 // FIXME - we don't do this.
1668 /* wipe all other threads */
1669 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1670 tso->link = END_TSO_QUEUE;
1672 /* When clearing out the threads, we need to ensure
1673 that a 'main thread' is left behind; if there isn't,
1674 the Scheduler will shutdown next time it is entered.
1676 ==> we don't kill a thread that's on the main_threads
1677 list (nor the current thread.)
1679 [ Attempts at implementing the more ambitious scheme of
1680 killing the main_threads also, and then adding the
1681 current thread onto the main_threads list if it wasn't
1682 there already, failed -- waitThread() (for one) wasn't
1683 up to it. If it proves to be desirable to also kill
1684 the main threads, then this scheme will have to be
1685 revisited (and fully debugged!)
1690 /* DO NOT TOUCH THE QUEUES directly because most of the code around
1691 us is picky about finding the thread still in its queue when
1692 handling the deleteThread() */
1694 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1697 /* Don't kill the current thread.. */
1698 if (t->id == tso->id) continue;
1700 /* ..or a main thread */
1701 for (m = main_threads; m != NULL; m = m->link) {
1702 if (m->tso->id == t->id) {
1713 RELEASE_LOCK(&sched_mutex);
1716 barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1717 /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1719 #endif /* mingw32 */
1722 /* ---------------------------------------------------------------------------
1723 * deleteAllThreads(): kill all the live threads.
1725 * This is used when we catch a user interrupt (^C), before performing
1726 * any necessary cleanups and running finalizers.
1728 * Locks: sched_mutex held.
1729 * ------------------------------------------------------------------------- */
1732 deleteAllThreads ( void )
1735 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1736 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1737 next = t->global_link;
1740 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1741 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1742 sleeping_queue = END_TSO_QUEUE;
1745 /* startThread and insertThread are now in GranSim.c -- HWL */
1748 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1749 //@subsection Suspend and Resume
1751 /* ---------------------------------------------------------------------------
1752 * Suspending & resuming Haskell threads.
1754 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1755 * its capability before calling the C function. This allows another
1756 * task to pick up the capability and carry on running Haskell
1757 * threads. It also means that if the C call blocks, it won't lock
1760 * The Haskell thread making the C call is put to sleep for the
1761 * duration of the call, on the susepended_ccalling_threads queue. We
1762 * give out a token to the task, which it can use to resume the thread
1763 * on return from the C function.
1764 * ------------------------------------------------------------------------- */
1767 suspendThread( StgRegTable *reg,
1769 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1776 int saved_errno = errno;
1778 /* assume that *reg is a pointer to the StgRegTable part
1781 cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1783 ACQUIRE_LOCK(&sched_mutex);
1786 sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1788 // XXX this might not be necessary --SDM
1789 cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1791 threadPaused(cap->r.rCurrentTSO);
1792 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1793 suspended_ccalling_threads = cap->r.rCurrentTSO;
1795 #if defined(RTS_SUPPORTS_THREADS)
1796 if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1798 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1799 cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1803 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1807 /* Use the thread ID as the token; it should be unique */
1808 tok = cap->r.rCurrentTSO->id;
1810 /* Hand back capability */
1811 releaseCapability(cap);
1813 #if defined(RTS_SUPPORTS_THREADS)
1814 /* Preparing to leave the RTS, so ensure there's a native thread/task
1815 waiting to take over.
1817 IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
1818 //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult
1819 startTask(taskStart);
1823 /* Other threads _might_ be available for execution; signal this */
1825 RELEASE_LOCK(&sched_mutex);
1827 errno = saved_errno;
1832 resumeThread( StgInt tok,
1833 rtsBool concCall STG_UNUSED )
1835 StgTSO *tso, **prev;
1837 int saved_errno = errno;
1839 #if defined(RTS_SUPPORTS_THREADS)
1840 /* Wait for permission to re-enter the RTS with the result. */
1841 ACQUIRE_LOCK(&sched_mutex);
1842 grabReturnCapability(&sched_mutex, &cap);
1844 IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId()));
1846 grabCapability(&cap);
1849 /* Remove the thread off of the suspended list */
1850 prev = &suspended_ccalling_threads;
1851 for (tso = suspended_ccalling_threads;
1852 tso != END_TSO_QUEUE;
1853 prev = &tso->link, tso = tso->link) {
1854 if (tso->id == (StgThreadID)tok) {
1859 if (tso == END_TSO_QUEUE) {
1860 barf("resumeThread: thread not found");
1862 tso->link = END_TSO_QUEUE;
1864 #if defined(RTS_SUPPORTS_THREADS)
1865 if(tso->why_blocked == BlockedOnCCall)
1867 awakenBlockedQueueNoLock(tso->blocked_exceptions);
1868 tso->blocked_exceptions = NULL;
1872 /* Reset blocking status */
1873 tso->why_blocked = NotBlocked;
1875 cap->r.rCurrentTSO = tso;
1876 #if defined(RTS_SUPPORTS_THREADS)
1877 RELEASE_LOCK(&sched_mutex);
1879 errno = saved_errno;
1884 /* ---------------------------------------------------------------------------
1886 * ------------------------------------------------------------------------ */
1887 static void unblockThread(StgTSO *tso);
1889 /* ---------------------------------------------------------------------------
1890 * Comparing Thread ids.
1892 * This is used from STG land in the implementation of the
1893 * instances of Eq/Ord for ThreadIds.
1894 * ------------------------------------------------------------------------ */
1897 cmp_thread(StgPtr tso1, StgPtr tso2)
1899 StgThreadID id1 = ((StgTSO *)tso1)->id;
1900 StgThreadID id2 = ((StgTSO *)tso2)->id;
1902 if (id1 < id2) return (-1);
1903 if (id1 > id2) return 1;
1907 /* ---------------------------------------------------------------------------
1908 * Fetching the ThreadID from an StgTSO.
1910 * This is used in the implementation of Show for ThreadIds.
1911 * ------------------------------------------------------------------------ */
1913 rts_getThreadId(StgPtr tso)
1915 return ((StgTSO *)tso)->id;
1920 labelThread(StgPtr tso, char *label)
1925 /* Caveat: Once set, you can only set the thread name to "" */
1926 len = strlen(label)+1;
1927 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1928 strncpy(buf,label,len);
1929 /* Update will free the old memory for us */
1930 updateThreadLabel((StgWord)tso,buf);
1934 /* ---------------------------------------------------------------------------
1935 Create a new thread.
1937 The new thread starts with the given stack size. Before the
1938 scheduler can run, however, this thread needs to have a closure
1939 (and possibly some arguments) pushed on its stack. See
1940 pushClosure() in Schedule.h.
1942 createGenThread() and createIOThread() (in SchedAPI.h) are
1943 convenient packaged versions of this function.
1945 currently pri (priority) is only used in a GRAN setup -- HWL
1946 ------------------------------------------------------------------------ */
1947 //@cindex createThread
1949 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1951 createThread(nat size, StgInt pri)
1954 createThread(nat size)
1961 /* First check whether we should create a thread at all */
1963 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1964 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1966 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1967 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1968 return END_TSO_QUEUE;
1974 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1977 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1979 /* catch ridiculously small stack sizes */
1980 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1981 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1984 stack_size = size - TSO_STRUCT_SIZEW;
1986 tso = (StgTSO *)allocate(size);
1987 TICK_ALLOC_TSO(stack_size, 0);
1989 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1991 SET_GRAN_HDR(tso, ThisPE);
1994 // Always start with the compiled code evaluator
1995 tso->what_next = ThreadRunGHC;
1997 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1998 * protect the increment operation on next_thread_id.
1999 * In future, we could use an atomic increment instead.
2001 ACQUIRE_LOCK(&thread_id_mutex);
2002 tso->id = next_thread_id++;
2003 RELEASE_LOCK(&thread_id_mutex);
2005 tso->why_blocked = NotBlocked;
2006 tso->blocked_exceptions = NULL;
2008 tso->saved_errno = 0;
2010 tso->stack_size = stack_size;
2011 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
2013 tso->sp = (P_)&(tso->stack) + stack_size;
2016 tso->prof.CCCS = CCS_MAIN;
2019 /* put a stop frame on the stack */
2020 tso->sp -= sizeofW(StgStopFrame);
2021 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2024 tso->link = END_TSO_QUEUE;
2025 /* uses more flexible routine in GranSim */
2026 insertThread(tso, CurrentProc);
2028 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2034 if (RtsFlags.GranFlags.GranSimStats.Full)
2035 DumpGranEvent(GR_START,tso);
2037 if (RtsFlags.ParFlags.ParStats.Full)
2038 DumpGranEvent(GR_STARTQ,tso);
2039 /* HACk to avoid SCHEDULE
2043 /* Link the new thread on the global thread list.
2045 tso->global_link = all_threads;
2049 tso->dist.priority = MandatoryPriority; //by default that is...
2053 tso->gran.pri = pri;
2055 tso->gran.magic = TSO_MAGIC; // debugging only
2057 tso->gran.sparkname = 0;
2058 tso->gran.startedat = CURRENT_TIME;
2059 tso->gran.exported = 0;
2060 tso->gran.basicblocks = 0;
2061 tso->gran.allocs = 0;
2062 tso->gran.exectime = 0;
2063 tso->gran.fetchtime = 0;
2064 tso->gran.fetchcount = 0;
2065 tso->gran.blocktime = 0;
2066 tso->gran.blockcount = 0;
2067 tso->gran.blockedat = 0;
2068 tso->gran.globalsparks = 0;
2069 tso->gran.localsparks = 0;
2070 if (RtsFlags.GranFlags.Light)
2071 tso->gran.clock = Now; /* local clock */
2073 tso->gran.clock = 0;
2075 IF_DEBUG(gran,printTSO(tso));
2078 tso->par.magic = TSO_MAGIC; // debugging only
2080 tso->par.sparkname = 0;
2081 tso->par.startedat = CURRENT_TIME;
2082 tso->par.exported = 0;
2083 tso->par.basicblocks = 0;
2084 tso->par.allocs = 0;
2085 tso->par.exectime = 0;
2086 tso->par.fetchtime = 0;
2087 tso->par.fetchcount = 0;
2088 tso->par.blocktime = 0;
2089 tso->par.blockcount = 0;
2090 tso->par.blockedat = 0;
2091 tso->par.globalsparks = 0;
2092 tso->par.localsparks = 0;
2096 globalGranStats.tot_threads_created++;
2097 globalGranStats.threads_created_on_PE[CurrentProc]++;
2098 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2099 globalGranStats.tot_sq_probes++;
2101 // collect parallel global statistics (currently done together with GC stats)
2102 if (RtsFlags.ParFlags.ParStats.Global &&
2103 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2104 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
2105 globalParStats.tot_threads_created++;
2111 belch("==__ schedule: Created TSO %d (%p);",
2112 CurrentProc, tso, tso->id));
2114 IF_PAR_DEBUG(verbose,
2115 belch("==__ schedule: Created TSO %d (%p); %d threads active",
2116 tso->id, tso, advisory_thread_count));
2118 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
2119 tso->id, tso->stack_size));
2126 all parallel thread creation calls should fall through the following routine.
2129 createSparkThread(rtsSpark spark)
2131 ASSERT(spark != (rtsSpark)NULL);
2132 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
2134 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2135 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2136 return END_TSO_QUEUE;
2140 tso = createThread(RtsFlags.GcFlags.initialStkSize);
2141 if (tso==END_TSO_QUEUE)
2142 barf("createSparkThread: Cannot create TSO");
2144 tso->priority = AdvisoryPriority;
2146 pushClosure(tso,spark);
2147 PUSH_ON_RUN_QUEUE(tso);
2148 advisory_thread_count++;
2155 Turn a spark into a thread.
2156 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2159 //@cindex activateSpark
2161 activateSpark (rtsSpark spark)
2165 tso = createSparkThread(spark);
2166 if (RtsFlags.ParFlags.ParStats.Full) {
2167 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2168 IF_PAR_DEBUG(verbose,
2169 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
2170 (StgClosure *)spark, info_type((StgClosure *)spark)));
2172 // ToDo: fwd info on local/global spark to thread -- HWL
2173 // tso->gran.exported = spark->exported;
2174 // tso->gran.locked = !spark->global;
2175 // tso->gran.sparkname = spark->name;
2181 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
2182 Capability *initialCapability
2186 /* ---------------------------------------------------------------------------
2189 * scheduleThread puts a thread on the head of the runnable queue.
2190 * This will usually be done immediately after a thread is created.
2191 * The caller of scheduleThread must create the thread using e.g.
2192 * createThread and push an appropriate closure
2193 * on this thread's stack before the scheduler is invoked.
2194 * ------------------------------------------------------------------------ */
2196 static void scheduleThread_ (StgTSO* tso);
2199 scheduleThread_(StgTSO *tso)
2201 // Precondition: sched_mutex must be held.
2203 /* Put the new thread on the head of the runnable queue. The caller
2204 * better push an appropriate closure on this thread's stack
2205 * beforehand. In the SMP case, the thread may start running as
2206 * soon as we release the scheduler lock below.
2208 PUSH_ON_RUN_QUEUE(tso);
2212 IF_DEBUG(scheduler,printTSO(tso));
2216 void scheduleThread(StgTSO* tso)
2218 ACQUIRE_LOCK(&sched_mutex);
2219 scheduleThread_(tso);
2220 RELEASE_LOCK(&sched_mutex);
2224 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability)
2225 { // Precondition: sched_mutex must be held
2228 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2232 #if defined(RTS_SUPPORTS_THREADS)
2233 initCondition(&m->wakeup);
2234 #if defined(THREADED_RTS)
2235 initCondition(&m->bound_thread_cond);
2239 /* Put the thread on the main-threads list prior to scheduling the TSO.
2240 Failure to do so introduces a race condition in the MT case (as
2241 identified by Wolfgang Thaller), whereby the new task/OS thread
2242 created by scheduleThread_() would complete prior to the thread
2243 that spawned it managed to put 'itself' on the main-threads list.
2244 The upshot of it all being that the worker thread wouldn't get to
2245 signal the completion of the its work item for the main thread to
2246 see (==> it got stuck waiting.) -- sof 6/02.
2248 IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id));
2250 m->link = main_threads;
2253 scheduleThread_(tso);
2255 return waitThread_(m, initialCapability);
2258 /* ---------------------------------------------------------------------------
2261 * Initialise the scheduler. This resets all the queues - if the
2262 * queues contained any threads, they'll be garbage collected at the
2265 * ------------------------------------------------------------------------ */
2269 term_handler(int sig STG_UNUSED)
2272 ACQUIRE_LOCK(&term_mutex);
2274 RELEASE_LOCK(&term_mutex);
2285 for (i=0; i<=MAX_PROC; i++) {
2286 run_queue_hds[i] = END_TSO_QUEUE;
2287 run_queue_tls[i] = END_TSO_QUEUE;
2288 blocked_queue_hds[i] = END_TSO_QUEUE;
2289 blocked_queue_tls[i] = END_TSO_QUEUE;
2290 ccalling_threadss[i] = END_TSO_QUEUE;
2291 sleeping_queue = END_TSO_QUEUE;
2294 run_queue_hd = END_TSO_QUEUE;
2295 run_queue_tl = END_TSO_QUEUE;
2296 blocked_queue_hd = END_TSO_QUEUE;
2297 blocked_queue_tl = END_TSO_QUEUE;
2298 sleeping_queue = END_TSO_QUEUE;
2301 suspended_ccalling_threads = END_TSO_QUEUE;
2303 main_threads = NULL;
2304 all_threads = END_TSO_QUEUE;
2309 RtsFlags.ConcFlags.ctxtSwitchTicks =
2310 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2312 #if defined(RTS_SUPPORTS_THREADS)
2313 /* Initialise the mutex and condition variables used by
2315 initMutex(&sched_mutex);
2316 initMutex(&term_mutex);
2317 initMutex(&thread_id_mutex);
2319 initCondition(&thread_ready_cond);
2323 initCondition(&gc_pending_cond);
2326 #if defined(RTS_SUPPORTS_THREADS)
2327 ACQUIRE_LOCK(&sched_mutex);
2330 /* Install the SIGHUP handler */
2333 struct sigaction action,oact;
2335 action.sa_handler = term_handler;
2336 sigemptyset(&action.sa_mask);
2337 action.sa_flags = 0;
2338 if (sigaction(SIGTERM, &action, &oact) != 0) {
2339 barf("can't install TERM handler");
2344 /* A capability holds the state a native thread needs in
2345 * order to execute STG code. At least one capability is
2346 * floating around (only SMP builds have more than one).
2350 #if defined(RTS_SUPPORTS_THREADS)
2351 /* start our haskell execution tasks */
2353 startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2355 startTaskManager(0,taskStart);
2359 #if /* defined(SMP) ||*/ defined(PAR)
2363 #if defined(RTS_SUPPORTS_THREADS)
2364 RELEASE_LOCK(&sched_mutex);
2370 exitScheduler( void )
2372 #if defined(RTS_SUPPORTS_THREADS)
2375 shutting_down_scheduler = rtsTrue;
2378 /* -----------------------------------------------------------------------------
2379 Managing the per-task allocation areas.
2381 Each capability comes with an allocation area. These are
2382 fixed-length block lists into which allocation can be done.
2384 ToDo: no support for two-space collection at the moment???
2385 -------------------------------------------------------------------------- */
2387 /* -----------------------------------------------------------------------------
2388 * waitThread is the external interface for running a new computation
2389 * and waiting for the result.
2391 * In the non-SMP case, we create a new main thread, push it on the
2392 * main-thread stack, and invoke the scheduler to run it. The
2393 * scheduler will return when the top main thread on the stack has
2394 * completed or died, and fill in the necessary fields of the
2395 * main_thread structure.
2397 * In the SMP case, we create a main thread as before, but we then
2398 * create a new condition variable and sleep on it. When our new
2399 * main thread has completed, we'll be woken up and the status/result
2400 * will be in the main_thread struct.
2401 * -------------------------------------------------------------------------- */
2404 howManyThreadsAvail ( void )
2408 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2410 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2412 for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2418 finishAllThreads ( void )
2421 while (run_queue_hd != END_TSO_QUEUE) {
2422 waitThread ( run_queue_hd, NULL, NULL );
2424 while (blocked_queue_hd != END_TSO_QUEUE) {
2425 waitThread ( blocked_queue_hd, NULL, NULL );
2427 while (sleeping_queue != END_TSO_QUEUE) {
2428 waitThread ( blocked_queue_hd, NULL, NULL );
2431 (blocked_queue_hd != END_TSO_QUEUE ||
2432 run_queue_hd != END_TSO_QUEUE ||
2433 sleeping_queue != END_TSO_QUEUE);
2437 waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability)
2440 SchedulerStatus stat;
2442 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2446 #if defined(RTS_SUPPORTS_THREADS)
2447 initCondition(&m->wakeup);
2448 #if defined(THREADED_RTS)
2449 initCondition(&m->bound_thread_cond);
2453 /* see scheduleWaitThread() comment */
2454 ACQUIRE_LOCK(&sched_mutex);
2455 m->link = main_threads;
2458 IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
2460 stat = waitThread_(m,initialCapability);
2462 RELEASE_LOCK(&sched_mutex);
2468 waitThread_(StgMainThread* m, Capability *initialCapability)
2470 SchedulerStatus stat;
2472 // Precondition: sched_mutex must be held.
2473 IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2475 #if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS)
2476 { // FIXME: does this still make sense?
2477 // It's not for the threaded rts => SMP only
2479 waitCondition(&m->wakeup, &sched_mutex);
2480 } while (m->stat == NoStatus);
2483 /* GranSim specific init */
2484 CurrentTSO = m->tso; // the TSO to run
2485 procStatus[MainProc] = Busy; // status of main PE
2486 CurrentProc = MainProc; // PE to run it on
2488 RELEASE_LOCK(&sched_mutex);
2489 schedule(m,initialCapability);
2491 RELEASE_LOCK(&sched_mutex);
2492 schedule(m,initialCapability);
2493 ACQUIRE_LOCK(&sched_mutex);
2494 ASSERT(m->stat != NoStatus);
2499 #if defined(RTS_SUPPORTS_THREADS)
2500 closeCondition(&m->wakeup);
2501 #if defined(THREADED_RTS)
2502 closeCondition(&m->bound_thread_cond);
2506 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
2510 // Postcondition: sched_mutex still held
2514 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2515 //@subsection Run queue code
2519 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2520 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2521 implicit global variable that has to be correct when calling these
2525 /* Put the new thread on the head of the runnable queue.
2526 * The caller of createThread better push an appropriate closure
2527 * on this thread's stack before the scheduler is invoked.
2529 static /* inline */ void
2530 add_to_run_queue(tso)
2533 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2534 tso->link = run_queue_hd;
2536 if (run_queue_tl == END_TSO_QUEUE) {
2541 /* Put the new thread at the end of the runnable queue. */
2542 static /* inline */ void
2543 push_on_run_queue(tso)
2546 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2547 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2548 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2549 if (run_queue_hd == END_TSO_QUEUE) {
2552 run_queue_tl->link = tso;
2558 Should be inlined because it's used very often in schedule. The tso
2559 argument is actually only needed in GranSim, where we want to have the
2560 possibility to schedule *any* TSO on the run queue, irrespective of the
2561 actual ordering. Therefore, if tso is not the nil TSO then we traverse
2562 the run queue and dequeue the tso, adjusting the links in the queue.
2564 //@cindex take_off_run_queue
2565 static /* inline */ StgTSO*
2566 take_off_run_queue(StgTSO *tso) {
2570 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2572 if tso is specified, unlink that tso from the run_queue (doesn't have
2573 to be at the beginning of the queue); GranSim only
2575 if (tso!=END_TSO_QUEUE) {
2576 /* find tso in queue */
2577 for (t=run_queue_hd, prev=END_TSO_QUEUE;
2578 t!=END_TSO_QUEUE && t!=tso;
2582 /* now actually dequeue the tso */
2583 if (prev!=END_TSO_QUEUE) {
2584 ASSERT(run_queue_hd!=t);
2585 prev->link = t->link;
2587 /* t is at beginning of thread queue */
2588 ASSERT(run_queue_hd==t);
2589 run_queue_hd = t->link;
2591 /* t is at end of thread queue */
2592 if (t->link==END_TSO_QUEUE) {
2593 ASSERT(t==run_queue_tl);
2594 run_queue_tl = prev;
2596 ASSERT(run_queue_tl!=t);
2598 t->link = END_TSO_QUEUE;
2600 /* take tso from the beginning of the queue; std concurrent code */
2602 if (t != END_TSO_QUEUE) {
2603 run_queue_hd = t->link;
2604 t->link = END_TSO_QUEUE;
2605 if (run_queue_hd == END_TSO_QUEUE) {
2606 run_queue_tl = END_TSO_QUEUE;
2615 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2616 //@subsection Garbage Collextion Routines
2618 /* ---------------------------------------------------------------------------
2619 Where are the roots that we know about?
2621 - all the threads on the runnable queue
2622 - all the threads on the blocked queue
2623 - all the threads on the sleeping queue
2624 - all the thread currently executing a _ccall_GC
2625 - all the "main threads"
2627 ------------------------------------------------------------------------ */
2629 /* This has to be protected either by the scheduler monitor, or by the
2630 garbage collection monitor (probably the latter).
2635 GetRoots(evac_fn evac)
2640 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2641 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2642 evac((StgClosure **)&run_queue_hds[i]);
2643 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2644 evac((StgClosure **)&run_queue_tls[i]);
2646 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2647 evac((StgClosure **)&blocked_queue_hds[i]);
2648 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2649 evac((StgClosure **)&blocked_queue_tls[i]);
2650 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2651 evac((StgClosure **)&ccalling_threads[i]);
2658 if (run_queue_hd != END_TSO_QUEUE) {
2659 ASSERT(run_queue_tl != END_TSO_QUEUE);
2660 evac((StgClosure **)&run_queue_hd);
2661 evac((StgClosure **)&run_queue_tl);
2664 if (blocked_queue_hd != END_TSO_QUEUE) {
2665 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2666 evac((StgClosure **)&blocked_queue_hd);
2667 evac((StgClosure **)&blocked_queue_tl);
2670 if (sleeping_queue != END_TSO_QUEUE) {
2671 evac((StgClosure **)&sleeping_queue);
2675 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2676 evac((StgClosure **)&suspended_ccalling_threads);
2679 #if defined(PAR) || defined(GRAN)
2680 markSparkQueue(evac);
2683 #if defined(RTS_USER_SIGNALS)
2684 // mark the signal handlers (signals should be already blocked)
2685 markSignalHandlers(evac);
2688 // main threads which have completed need to be retained until they
2689 // are dealt with in the main scheduler loop. They won't be
2690 // retained any other way: the GC will drop them from the
2691 // all_threads list, so we have to be careful to treat them as roots
2695 for (m = main_threads; m != NULL; m = m->link) {
2696 switch (m->tso->what_next) {
2697 case ThreadComplete:
2699 evac((StgClosure **)&m->tso);
2708 /* -----------------------------------------------------------------------------
2711 This is the interface to the garbage collector from Haskell land.
2712 We provide this so that external C code can allocate and garbage
2713 collect when called from Haskell via _ccall_GC.
2715 It might be useful to provide an interface whereby the programmer
2716 can specify more roots (ToDo).
2718 This needs to be protected by the GC condition variable above. KH.
2719 -------------------------------------------------------------------------- */
2721 static void (*extra_roots)(evac_fn);
2726 /* Obligated to hold this lock upon entry */
2727 ACQUIRE_LOCK(&sched_mutex);
2728 GarbageCollect(GetRoots,rtsFalse);
2729 RELEASE_LOCK(&sched_mutex);
2733 performMajorGC(void)
2735 ACQUIRE_LOCK(&sched_mutex);
2736 GarbageCollect(GetRoots,rtsTrue);
2737 RELEASE_LOCK(&sched_mutex);
2741 AllRoots(evac_fn evac)
2743 GetRoots(evac); // the scheduler's roots
2744 extra_roots(evac); // the user's roots
2748 performGCWithRoots(void (*get_roots)(evac_fn))
2750 ACQUIRE_LOCK(&sched_mutex);
2751 extra_roots = get_roots;
2752 GarbageCollect(AllRoots,rtsFalse);
2753 RELEASE_LOCK(&sched_mutex);
2756 /* -----------------------------------------------------------------------------
2759 If the thread has reached its maximum stack size, then raise the
2760 StackOverflow exception in the offending thread. Otherwise
2761 relocate the TSO into a larger chunk of memory and adjust its stack
2763 -------------------------------------------------------------------------- */
2766 threadStackOverflow(StgTSO *tso)
2768 nat new_stack_size, new_tso_size, stack_words;
2772 IF_DEBUG(sanity,checkTSO(tso));
2773 if (tso->stack_size >= tso->max_stack_size) {
2776 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2777 tso->id, tso, tso->stack_size, tso->max_stack_size);
2778 /* If we're debugging, just print out the top of the stack */
2779 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2782 /* Send this thread the StackOverflow exception */
2783 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2787 /* Try to double the current stack size. If that takes us over the
2788 * maximum stack size for this thread, then use the maximum instead.
2789 * Finally round up so the TSO ends up as a whole number of blocks.
2791 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2792 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2793 TSO_STRUCT_SIZE)/sizeof(W_);
2794 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2795 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2797 IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2799 dest = (StgTSO *)allocate(new_tso_size);
2800 TICK_ALLOC_TSO(new_stack_size,0);
2802 /* copy the TSO block and the old stack into the new area */
2803 memcpy(dest,tso,TSO_STRUCT_SIZE);
2804 stack_words = tso->stack + tso->stack_size - tso->sp;
2805 new_sp = (P_)dest + new_tso_size - stack_words;
2806 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2808 /* relocate the stack pointers... */
2810 dest->stack_size = new_stack_size;
2812 /* Mark the old TSO as relocated. We have to check for relocated
2813 * TSOs in the garbage collector and any primops that deal with TSOs.
2815 * It's important to set the sp value to just beyond the end
2816 * of the stack, so we don't attempt to scavenge any part of the
2819 tso->what_next = ThreadRelocated;
2821 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2822 tso->why_blocked = NotBlocked;
2823 dest->mut_link = NULL;
2825 IF_PAR_DEBUG(verbose,
2826 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2827 tso->id, tso, tso->stack_size);
2828 /* If we're debugging, just print out the top of the stack */
2829 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2832 IF_DEBUG(sanity,checkTSO(tso));
2834 IF_DEBUG(scheduler,printTSO(dest));
2840 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2841 //@subsection Blocking Queue Routines
2843 /* ---------------------------------------------------------------------------
2844 Wake up a queue that was blocked on some resource.
2845 ------------------------------------------------------------------------ */
2849 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2854 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2856 /* write RESUME events to log file and
2857 update blocked and fetch time (depending on type of the orig closure) */
2858 if (RtsFlags.ParFlags.ParStats.Full) {
2859 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2860 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2861 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2862 if (EMPTY_RUN_QUEUE())
2863 emitSchedule = rtsTrue;
2865 switch (get_itbl(node)->type) {
2867 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2872 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2879 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2886 static StgBlockingQueueElement *
2887 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2890 PEs node_loc, tso_loc;
2892 node_loc = where_is(node); // should be lifted out of loop
2893 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2894 tso_loc = where_is((StgClosure *)tso);
2895 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2896 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2897 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2898 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2899 // insertThread(tso, node_loc);
2900 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2902 tso, node, (rtsSpark*)NULL);
2903 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2906 } else { // TSO is remote (actually should be FMBQ)
2907 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2908 RtsFlags.GranFlags.Costs.gunblocktime +
2909 RtsFlags.GranFlags.Costs.latency;
2910 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2912 tso, node, (rtsSpark*)NULL);
2913 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2916 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2918 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2919 (node_loc==tso_loc ? "Local" : "Global"),
2920 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2921 tso->block_info.closure = NULL;
2922 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2926 static StgBlockingQueueElement *
2927 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2929 StgBlockingQueueElement *next;
2931 switch (get_itbl(bqe)->type) {
2933 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2934 /* if it's a TSO just push it onto the run_queue */
2936 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2937 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2939 unblockCount(bqe, node);
2940 /* reset blocking status after dumping event */
2941 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2945 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2947 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2948 PendingFetches = (StgBlockedFetch *)bqe;
2952 /* can ignore this case in a non-debugging setup;
2953 see comments on RBHSave closures above */
2955 /* check that the closure is an RBHSave closure */
2956 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2957 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2958 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2962 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2963 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2967 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2971 #else /* !GRAN && !PAR */
2973 unblockOneLocked(StgTSO *tso)
2977 ASSERT(get_itbl(tso)->type == TSO);
2978 ASSERT(tso->why_blocked != NotBlocked);
2979 tso->why_blocked = NotBlocked;
2981 PUSH_ON_RUN_QUEUE(tso);
2983 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2988 #if defined(GRAN) || defined(PAR)
2989 inline StgBlockingQueueElement *
2990 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2992 ACQUIRE_LOCK(&sched_mutex);
2993 bqe = unblockOneLocked(bqe, node);
2994 RELEASE_LOCK(&sched_mutex);
2999 unblockOne(StgTSO *tso)
3001 ACQUIRE_LOCK(&sched_mutex);
3002 tso = unblockOneLocked(tso);
3003 RELEASE_LOCK(&sched_mutex);
3010 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3012 StgBlockingQueueElement *bqe;
3017 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
3018 node, CurrentProc, CurrentTime[CurrentProc],
3019 CurrentTSO->id, CurrentTSO));
3021 node_loc = where_is(node);
3023 ASSERT(q == END_BQ_QUEUE ||
3024 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
3025 get_itbl(q)->type == CONSTR); // closure (type constructor)
3026 ASSERT(is_unique(node));
3028 /* FAKE FETCH: magically copy the node to the tso's proc;
3029 no Fetch necessary because in reality the node should not have been
3030 moved to the other PE in the first place
3032 if (CurrentProc!=node_loc) {
3034 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
3035 node, node_loc, CurrentProc, CurrentTSO->id,
3036 // CurrentTSO, where_is(CurrentTSO),
3037 node->header.gran.procs));
3038 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3040 belch("## new bitmask of node %p is %#x",
3041 node, node->header.gran.procs));
3042 if (RtsFlags.GranFlags.GranSimStats.Global) {
3043 globalGranStats.tot_fake_fetches++;
3048 // ToDo: check: ASSERT(CurrentProc==node_loc);
3049 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3052 bqe points to the current element in the queue
3053 next points to the next element in the queue
3055 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3056 //tso_loc = where_is(tso);
3058 bqe = unblockOneLocked(bqe, node);
3061 /* if this is the BQ of an RBH, we have to put back the info ripped out of
3062 the closure to make room for the anchor of the BQ */
3063 if (bqe!=END_BQ_QUEUE) {
3064 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3066 ASSERT((info_ptr==&RBH_Save_0_info) ||
3067 (info_ptr==&RBH_Save_1_info) ||
3068 (info_ptr==&RBH_Save_2_info));
3070 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3071 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3072 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3075 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
3076 node, info_type(node)));
3079 /* statistics gathering */
3080 if (RtsFlags.GranFlags.GranSimStats.Global) {
3081 // globalGranStats.tot_bq_processing_time += bq_processing_time;
3082 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
3083 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
3084 globalGranStats.tot_awbq++; // total no. of bqs awakened
3087 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
3088 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3092 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3094 StgBlockingQueueElement *bqe;
3096 ACQUIRE_LOCK(&sched_mutex);
3098 IF_PAR_DEBUG(verbose,
3099 belch("##-_ AwBQ for node %p on [%x]: ",
3103 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3104 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
3109 ASSERT(q == END_BQ_QUEUE ||
3110 get_itbl(q)->type == TSO ||
3111 get_itbl(q)->type == BLOCKED_FETCH ||
3112 get_itbl(q)->type == CONSTR);
3115 while (get_itbl(bqe)->type==TSO ||
3116 get_itbl(bqe)->type==BLOCKED_FETCH) {
3117 bqe = unblockOneLocked(bqe, node);
3119 RELEASE_LOCK(&sched_mutex);
3122 #else /* !GRAN && !PAR */
3124 #ifdef RTS_SUPPORTS_THREADS
3126 awakenBlockedQueueNoLock(StgTSO *tso)
3128 while (tso != END_TSO_QUEUE) {
3129 tso = unblockOneLocked(tso);
3135 awakenBlockedQueue(StgTSO *tso)
3137 ACQUIRE_LOCK(&sched_mutex);
3138 while (tso != END_TSO_QUEUE) {
3139 tso = unblockOneLocked(tso);
3141 RELEASE_LOCK(&sched_mutex);
3145 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
3146 //@subsection Exception Handling Routines
3148 /* ---------------------------------------------------------------------------
3150 - usually called inside a signal handler so it mustn't do anything fancy.
3151 ------------------------------------------------------------------------ */
3154 interruptStgRts(void)
3160 /* -----------------------------------------------------------------------------
3163 This is for use when we raise an exception in another thread, which
3165 This has nothing to do with the UnblockThread event in GranSim. -- HWL
3166 -------------------------------------------------------------------------- */
3168 #if defined(GRAN) || defined(PAR)
3170 NB: only the type of the blocking queue is different in GranSim and GUM
3171 the operations on the queue-elements are the same
3172 long live polymorphism!
3174 Locks: sched_mutex is held upon entry and exit.
3178 unblockThread(StgTSO *tso)
3180 StgBlockingQueueElement *t, **last;
3182 switch (tso->why_blocked) {
3185 return; /* not blocked */
3188 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3190 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3191 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3193 last = (StgBlockingQueueElement **)&mvar->head;
3194 for (t = (StgBlockingQueueElement *)mvar->head;
3196 last = &t->link, last_tso = t, t = t->link) {
3197 if (t == (StgBlockingQueueElement *)tso) {
3198 *last = (StgBlockingQueueElement *)tso->link;
3199 if (mvar->tail == tso) {
3200 mvar->tail = (StgTSO *)last_tso;
3205 barf("unblockThread (MVAR): TSO not found");
3208 case BlockedOnBlackHole:
3209 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3211 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3213 last = &bq->blocking_queue;
3214 for (t = bq->blocking_queue;
3216 last = &t->link, t = t->link) {
3217 if (t == (StgBlockingQueueElement *)tso) {
3218 *last = (StgBlockingQueueElement *)tso->link;
3222 barf("unblockThread (BLACKHOLE): TSO not found");
3225 case BlockedOnException:
3227 StgTSO *target = tso->block_info.tso;
3229 ASSERT(get_itbl(target)->type == TSO);
3231 if (target->what_next == ThreadRelocated) {
3232 target = target->link;
3233 ASSERT(get_itbl(target)->type == TSO);
3236 ASSERT(target->blocked_exceptions != NULL);
3238 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3239 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
3241 last = &t->link, t = t->link) {
3242 ASSERT(get_itbl(t)->type == TSO);
3243 if (t == (StgBlockingQueueElement *)tso) {
3244 *last = (StgBlockingQueueElement *)tso->link;
3248 barf("unblockThread (Exception): TSO not found");
3252 case BlockedOnWrite:
3253 #if defined(mingw32_TARGET_OS)
3254 case BlockedOnDoProc:
3257 /* take TSO off blocked_queue */
3258 StgBlockingQueueElement *prev = NULL;
3259 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
3260 prev = t, t = t->link) {
3261 if (t == (StgBlockingQueueElement *)tso) {
3263 blocked_queue_hd = (StgTSO *)t->link;
3264 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3265 blocked_queue_tl = END_TSO_QUEUE;
3268 prev->link = t->link;
3269 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3270 blocked_queue_tl = (StgTSO *)prev;
3276 barf("unblockThread (I/O): TSO not found");
3279 case BlockedOnDelay:
3281 /* take TSO off sleeping_queue */
3282 StgBlockingQueueElement *prev = NULL;
3283 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
3284 prev = t, t = t->link) {
3285 if (t == (StgBlockingQueueElement *)tso) {
3287 sleeping_queue = (StgTSO *)t->link;
3289 prev->link = t->link;
3294 barf("unblockThread (delay): TSO not found");
3298 barf("unblockThread");
3302 tso->link = END_TSO_QUEUE;
3303 tso->why_blocked = NotBlocked;
3304 tso->block_info.closure = NULL;
3305 PUSH_ON_RUN_QUEUE(tso);
3309 unblockThread(StgTSO *tso)
3313 /* To avoid locking unnecessarily. */
3314 if (tso->why_blocked == NotBlocked) {
3318 switch (tso->why_blocked) {
3321 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3323 StgTSO *last_tso = END_TSO_QUEUE;
3324 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3327 for (t = mvar->head; t != END_TSO_QUEUE;
3328 last = &t->link, last_tso = t, t = t->link) {
3331 if (mvar->tail == tso) {
3332 mvar->tail = last_tso;
3337 barf("unblockThread (MVAR): TSO not found");
3340 case BlockedOnBlackHole:
3341 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3343 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3345 last = &bq->blocking_queue;
3346 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
3347 last = &t->link, t = t->link) {
3353 barf("unblockThread (BLACKHOLE): TSO not found");
3356 case BlockedOnException:
3358 StgTSO *target = tso->block_info.tso;
3360 ASSERT(get_itbl(target)->type == TSO);
3362 while (target->what_next == ThreadRelocated) {
3363 target = target->link;
3364 ASSERT(get_itbl(target)->type == TSO);
3367 ASSERT(target->blocked_exceptions != NULL);
3369 last = &target->blocked_exceptions;
3370 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
3371 last = &t->link, t = t->link) {
3372 ASSERT(get_itbl(t)->type == TSO);
3378 barf("unblockThread (Exception): TSO not found");
3382 case BlockedOnWrite:
3383 #if defined(mingw32_TARGET_OS)
3384 case BlockedOnDoProc:
3387 StgTSO *prev = NULL;
3388 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
3389 prev = t, t = t->link) {
3392 blocked_queue_hd = t->link;
3393 if (blocked_queue_tl == t) {
3394 blocked_queue_tl = END_TSO_QUEUE;
3397 prev->link = t->link;
3398 if (blocked_queue_tl == t) {
3399 blocked_queue_tl = prev;
3405 barf("unblockThread (I/O): TSO not found");
3408 case BlockedOnDelay:
3410 StgTSO *prev = NULL;
3411 for (t = sleeping_queue; t != END_TSO_QUEUE;
3412 prev = t, t = t->link) {
3415 sleeping_queue = t->link;
3417 prev->link = t->link;
3422 barf("unblockThread (delay): TSO not found");
3426 barf("unblockThread");
3430 tso->link = END_TSO_QUEUE;
3431 tso->why_blocked = NotBlocked;
3432 tso->block_info.closure = NULL;
3433 PUSH_ON_RUN_QUEUE(tso);
3437 /* -----------------------------------------------------------------------------
3440 * The following function implements the magic for raising an
3441 * asynchronous exception in an existing thread.
3443 * We first remove the thread from any queue on which it might be
3444 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3446 * We strip the stack down to the innermost CATCH_FRAME, building
3447 * thunks in the heap for all the active computations, so they can
3448 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3449 * an application of the handler to the exception, and push it on
3450 * the top of the stack.
3452 * How exactly do we save all the active computations? We create an
3453 * AP_STACK for every UpdateFrame on the stack. Entering one of these
3454 * AP_STACKs pushes everything from the corresponding update frame
3455 * upwards onto the stack. (Actually, it pushes everything up to the
3456 * next update frame plus a pointer to the next AP_STACK object.
3457 * Entering the next AP_STACK object pushes more onto the stack until we
3458 * reach the last AP_STACK object - at which point the stack should look
3459 * exactly as it did when we killed the TSO and we can continue
3460 * execution by entering the closure on top of the stack.
3462 * We can also kill a thread entirely - this happens if either (a) the
3463 * exception passed to raiseAsync is NULL, or (b) there's no
3464 * CATCH_FRAME on the stack. In either case, we strip the entire
3465 * stack and replace the thread with a zombie.
3467 * Locks: sched_mutex held upon entry nor exit.
3469 * -------------------------------------------------------------------------- */
3472 deleteThread(StgTSO *tso)
3474 raiseAsync(tso,NULL);
3478 deleteThreadImmediately(StgTSO *tso)
3479 { // for forkProcess only:
3480 // delete thread without giving it a chance to catch the KillThread exception
3482 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3486 tso->what_next = ThreadKilled;
3490 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3492 /* When raising async exs from contexts where sched_mutex isn't held;
3493 use raiseAsyncWithLock(). */
3494 ACQUIRE_LOCK(&sched_mutex);
3495 raiseAsync(tso,exception);
3496 RELEASE_LOCK(&sched_mutex);
3500 raiseAsync(StgTSO *tso, StgClosure *exception)
3502 StgRetInfoTable *info;
3505 // Thread already dead?
3506 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3511 sched_belch("raising exception in thread %ld.", tso->id));
3513 // Remove it from any blocking queues
3518 // The stack freezing code assumes there's a closure pointer on
3519 // the top of the stack, so we have to arrange that this is the case...
3521 if (sp[0] == (W_)&stg_enter_info) {
3525 sp[0] = (W_)&stg_dummy_ret_closure;
3531 // 1. Let the top of the stack be the "current closure"
3533 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3536 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3537 // current closure applied to the chunk of stack up to (but not
3538 // including) the update frame. This closure becomes the "current
3539 // closure". Go back to step 2.
3541 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3542 // top of the stack applied to the exception.
3544 // 5. If it's a STOP_FRAME, then kill the thread.
3549 info = get_ret_itbl((StgClosure *)frame);
3551 while (info->i.type != UPDATE_FRAME
3552 && (info->i.type != CATCH_FRAME || exception == NULL)
3553 && info->i.type != STOP_FRAME) {
3554 frame += stack_frame_sizeW((StgClosure *)frame);
3555 info = get_ret_itbl((StgClosure *)frame);
3558 switch (info->i.type) {
3561 // If we find a CATCH_FRAME, and we've got an exception to raise,
3562 // then build the THUNK raise(exception), and leave it on
3563 // top of the CATCH_FRAME ready to enter.
3567 StgCatchFrame *cf = (StgCatchFrame *)frame;
3571 // we've got an exception to raise, so let's pass it to the
3572 // handler in this frame.
3574 raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3575 TICK_ALLOC_SE_THK(1,0);
3576 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3577 raise->payload[0] = exception;
3579 // throw away the stack from Sp up to the CATCH_FRAME.
3583 /* Ensure that async excpetions are blocked now, so we don't get
3584 * a surprise exception before we get around to executing the
3587 if (tso->blocked_exceptions == NULL) {
3588 tso->blocked_exceptions = END_TSO_QUEUE;
3591 /* Put the newly-built THUNK on top of the stack, ready to execute
3592 * when the thread restarts.
3595 sp[-1] = (W_)&stg_enter_info;
3597 tso->what_next = ThreadRunGHC;
3598 IF_DEBUG(sanity, checkTSO(tso));
3607 // First build an AP_STACK consisting of the stack chunk above the
3608 // current update frame, with the top word on the stack as the
3611 words = frame - sp - 1;
3612 ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3615 ap->fun = (StgClosure *)sp[0];
3617 for(i=0; i < (nat)words; ++i) {
3618 ap->payload[i] = (StgClosure *)*sp++;
3621 SET_HDR(ap,&stg_AP_STACK_info,
3622 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3623 TICK_ALLOC_UP_THK(words+1,0);
3626 fprintf(stderr, "scheduler: Updating ");
3627 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3628 fprintf(stderr, " with ");
3629 printObj((StgClosure *)ap);
3632 // Replace the updatee with an indirection - happily
3633 // this will also wake up any threads currently
3634 // waiting on the result.
3636 // Warning: if we're in a loop, more than one update frame on
3637 // the stack may point to the same object. Be careful not to
3638 // overwrite an IND_OLDGEN in this case, because we'll screw
3639 // up the mutable lists. To be on the safe side, don't
3640 // overwrite any kind of indirection at all. See also
3641 // threadSqueezeStack in GC.c, where we have to make a similar
3644 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3645 // revert the black hole
3646 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3648 sp += sizeofW(StgUpdateFrame) - 1;
3649 sp[0] = (W_)ap; // push onto stack
3654 // We've stripped the entire stack, the thread is now dead.
3655 sp += sizeofW(StgStopFrame);
3656 tso->what_next = ThreadKilled;
3667 /* -----------------------------------------------------------------------------
3668 resurrectThreads is called after garbage collection on the list of
3669 threads found to be garbage. Each of these threads will be woken
3670 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3671 on an MVar, or NonTermination if the thread was blocked on a Black
3674 Locks: sched_mutex isn't held upon entry nor exit.
3675 -------------------------------------------------------------------------- */
3678 resurrectThreads( StgTSO *threads )
3682 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3683 next = tso->global_link;
3684 tso->global_link = all_threads;
3686 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3688 switch (tso->why_blocked) {
3690 case BlockedOnException:
3691 /* Called by GC - sched_mutex lock is currently held. */
3692 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3694 case BlockedOnBlackHole:
3695 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3698 /* This might happen if the thread was blocked on a black hole
3699 * belonging to a thread that we've just woken up (raiseAsync
3700 * can wake up threads, remember...).
3704 barf("resurrectThreads: thread blocked in a strange way");
3709 /* -----------------------------------------------------------------------------
3710 * Blackhole detection: if we reach a deadlock, test whether any
3711 * threads are blocked on themselves. Any threads which are found to
3712 * be self-blocked get sent a NonTermination exception.
3714 * This is only done in a deadlock situation in order to avoid
3715 * performance overhead in the normal case.
3717 * Locks: sched_mutex is held upon entry and exit.
3718 * -------------------------------------------------------------------------- */
3721 detectBlackHoles( void )
3723 StgTSO *tso = all_threads;
3725 StgClosure *blocked_on;
3726 StgRetInfoTable *info;
3728 for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3730 while (tso->what_next == ThreadRelocated) {
3732 ASSERT(get_itbl(tso)->type == TSO);
3735 if (tso->why_blocked != BlockedOnBlackHole) {
3738 blocked_on = tso->block_info.closure;
3740 frame = (StgClosure *)tso->sp;
3743 info = get_ret_itbl(frame);
3744 switch (info->i.type) {
3746 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3747 /* We are blocking on one of our own computations, so
3748 * send this thread the NonTermination exception.
3751 sched_belch("thread %d is blocked on itself", tso->id));
3752 raiseAsync(tso, (StgClosure *)NonTermination_closure);
3756 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3762 // normal stack frames; do nothing except advance the pointer
3764 (StgPtr)frame += stack_frame_sizeW(frame);
3771 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3772 //@subsection Debugging Routines
3774 /* -----------------------------------------------------------------------------
3775 * Debugging: why is a thread blocked
3776 * [Also provides useful information when debugging threaded programs
3777 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3778 -------------------------------------------------------------------------- */
3782 printThreadBlockage(StgTSO *tso)
3784 switch (tso->why_blocked) {
3786 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3788 case BlockedOnWrite:
3789 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3791 #if defined(mingw32_TARGET_OS)
3792 case BlockedOnDoProc:
3793 fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3796 case BlockedOnDelay:
3797 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3800 fprintf(stderr,"is blocked on an MVar");
3802 case BlockedOnException:
3803 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3804 tso->block_info.tso->id);
3806 case BlockedOnBlackHole:
3807 fprintf(stderr,"is blocked on a black hole");
3810 fprintf(stderr,"is not blocked");
3814 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3815 tso->block_info.closure, info_type(tso->block_info.closure));
3817 case BlockedOnGA_NoSend:
3818 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3819 tso->block_info.closure, info_type(tso->block_info.closure));
3822 #if defined(RTS_SUPPORTS_THREADS)
3823 case BlockedOnCCall:
3824 fprintf(stderr,"is blocked on an external call");
3826 case BlockedOnCCall_NoUnblockExc:
3827 fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3831 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3832 tso->why_blocked, tso->id, tso);
3838 printThreadStatus(StgTSO *tso)
3840 switch (tso->what_next) {
3842 fprintf(stderr,"has been killed");
3844 case ThreadComplete:
3845 fprintf(stderr,"has completed");
3848 printThreadBlockage(tso);
3853 printAllThreads(void)
3859 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3860 ullong_format_string(TIME_ON_PROC(CurrentProc),
3861 time_string, rtsFalse/*no commas!*/);
3863 fprintf(stderr, "all threads at [%s]:\n", time_string);
3865 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3866 ullong_format_string(CURRENT_TIME,
3867 time_string, rtsFalse/*no commas!*/);
3869 fprintf(stderr,"all threads at [%s]:\n", time_string);
3871 fprintf(stderr,"all threads:\n");
3874 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3875 fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3876 label = lookupThreadLabel((StgWord)t);
3877 if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3878 printThreadStatus(t);
3879 fprintf(stderr,"\n");
3886 Print a whole blocking queue attached to node (debugging only).
3891 print_bq (StgClosure *node)
3893 StgBlockingQueueElement *bqe;
3897 fprintf(stderr,"## BQ of closure %p (%s): ",
3898 node, info_type(node));
3900 /* should cover all closures that may have a blocking queue */
3901 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3902 get_itbl(node)->type == FETCH_ME_BQ ||
3903 get_itbl(node)->type == RBH ||
3904 get_itbl(node)->type == MVAR);
3906 ASSERT(node!=(StgClosure*)NULL); // sanity check
3908 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3912 Print a whole blocking queue starting with the element bqe.
3915 print_bqe (StgBlockingQueueElement *bqe)
3920 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3922 for (end = (bqe==END_BQ_QUEUE);
3923 !end; // iterate until bqe points to a CONSTR
3924 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3925 bqe = end ? END_BQ_QUEUE : bqe->link) {
3926 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3927 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3928 /* types of closures that may appear in a blocking queue */
3929 ASSERT(get_itbl(bqe)->type == TSO ||
3930 get_itbl(bqe)->type == BLOCKED_FETCH ||
3931 get_itbl(bqe)->type == CONSTR);
3932 /* only BQs of an RBH end with an RBH_Save closure */
3933 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3935 switch (get_itbl(bqe)->type) {
3937 fprintf(stderr," TSO %u (%x),",
3938 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3941 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3942 ((StgBlockedFetch *)bqe)->node,
3943 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3944 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3945 ((StgBlockedFetch *)bqe)->ga.weight);
3948 fprintf(stderr," %s (IP %p),",
3949 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3950 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3951 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3952 "RBH_Save_?"), get_itbl(bqe));
3955 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3956 info_type((StgClosure *)bqe)); // , node, info_type(node));
3960 fputc('\n', stderr);
3962 # elif defined(GRAN)
3964 print_bq (StgClosure *node)
3966 StgBlockingQueueElement *bqe;
3967 PEs node_loc, tso_loc;
3970 /* should cover all closures that may have a blocking queue */
3971 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3972 get_itbl(node)->type == FETCH_ME_BQ ||
3973 get_itbl(node)->type == RBH);
3975 ASSERT(node!=(StgClosure*)NULL); // sanity check
3976 node_loc = where_is(node);
3978 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3979 node, info_type(node), node_loc);
3982 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3984 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3985 !end; // iterate until bqe points to a CONSTR
3986 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3987 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3988 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3989 /* types of closures that may appear in a blocking queue */
3990 ASSERT(get_itbl(bqe)->type == TSO ||
3991 get_itbl(bqe)->type == CONSTR);
3992 /* only BQs of an RBH end with an RBH_Save closure */
3993 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3995 tso_loc = where_is((StgClosure *)bqe);
3996 switch (get_itbl(bqe)->type) {
3998 fprintf(stderr," TSO %d (%p) on [PE %d],",
3999 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4002 fprintf(stderr," %s (IP %p),",
4003 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4004 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4005 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4006 "RBH_Save_?"), get_itbl(bqe));
4009 barf("Unexpected closure type %s in blocking queue of %p (%s)",
4010 info_type((StgClosure *)bqe), node, info_type(node));
4014 fputc('\n', stderr);
4018 Nice and easy: only TSOs on the blocking queue
4021 print_bq (StgClosure *node)
4025 ASSERT(node!=(StgClosure*)NULL); // sanity check
4026 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
4027 tso != END_TSO_QUEUE;
4029 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
4030 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
4031 fprintf(stderr," TSO %d (%p),", tso->id, tso);
4033 fputc('\n', stderr);
4044 for (i=0, tso=run_queue_hd;
4045 tso != END_TSO_QUEUE;
4054 sched_belch(char *s, ...)
4059 fprintf(stderr, "scheduler (task %ld): ", osThreadId());
4061 fprintf(stderr, "== ");
4063 fprintf(stderr, "scheduler: ");
4065 vfprintf(stderr, s, ap);
4066 fprintf(stderr, "\n");
4073 //@node Index, , Debugging Routines, Main scheduling code
4077 //* StgMainThread:: @cindex\s-+StgMainThread
4078 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
4079 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
4080 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
4081 //* context_switch:: @cindex\s-+context_switch
4082 //* createThread:: @cindex\s-+createThread
4083 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
4084 //* initScheduler:: @cindex\s-+initScheduler
4085 //* interrupted:: @cindex\s-+interrupted
4086 //* next_thread_id:: @cindex\s-+next_thread_id
4087 //* print_bq:: @cindex\s-+print_bq
4088 //* run_queue_hd:: @cindex\s-+run_queue_hd
4089 //* run_queue_tl:: @cindex\s-+run_queue_tl
4090 //* sched_mutex:: @cindex\s-+sched_mutex
4091 //* schedule:: @cindex\s-+schedule
4092 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
4093 //* term_mutex:: @cindex\s-+term_mutex