1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.175 2003/09/26 13:32:14 panne Exp $
4 * (c) The GHC Team, 1998-2000
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distributed memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
18 * --------------------------------------------------------------------------*/
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
24 * Version with scheduler monitor support for SMPs (WAY=s):
26 This design provides a high-level API to create and schedule threads etc.
27 as documented in the SMP design document.
29 It uses a monitor design controlled by a single mutex to exercise control
30 over accesses to shared data structures, and builds on the Posix threads
33 The majority of state is shared. In order to keep essential per-task state,
34 there is a Capability structure, which contains all the information
35 needed to run a thread: its STG registers, a pointer to its TSO, a
36 nursery etc. During STG execution, a pointer to the capability is
37 kept in a register (BaseReg).
39 In a non-SMP build, there is one global capability, namely MainRegTable.
43 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
45 The main scheduling loop in GUM iterates until a finish message is received.
46 In that case a global flag @receivedFinish@ is set and this instance of
47 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48 for the handling of incoming messages, such as PP_FINISH.
49 Note that in the parallel case we have a system manager that coordinates
50 different PEs, each of which are running one instance of the RTS.
51 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
54 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
56 The main scheduling code in GranSim is quite different from that in std
57 (concurrent) Haskell: while concurrent Haskell just iterates over the
58 threads in the runnable queue, GranSim is event driven, i.e. it iterates
59 over the events in the global event queue. -- HWL
64 //* Variables and Data structures::
65 //* Main scheduling loop::
66 //* Suspend and Resume::
68 //* Garbage Collextion Routines::
69 //* Blocking Queue Routines::
70 //* Exception Handling Routines::
71 //* Debugging Routines::
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
78 #include "PosixSource.h"
85 #include "StgStartup.h"
87 #define COMPILING_SCHEDULER
89 #include "StgMiscClosures.h"
91 #include "Interpreter.h"
92 #include "Exception.h"
99 #include "ThreadLabels.h"
101 #include "Proftimer.h"
102 #include "ProfHeap.h"
104 #if defined(GRAN) || defined(PAR)
105 # include "GranSimRts.h"
106 # include "GranSim.h"
107 # include "ParallelRts.h"
108 # include "Parallel.h"
109 # include "ParallelDebug.h"
110 # include "FetchMe.h"
114 #include "Capability.h"
115 #include "OSThreads.h"
118 #ifdef HAVE_SYS_TYPES_H
119 #include <sys/types.h>
134 #define USED_IN_THREADED_RTS
136 #define USED_IN_THREADED_RTS STG_UNUSED
139 #ifdef RTS_SUPPORTS_THREADS
140 #define USED_WHEN_RTS_SUPPORTS_THREADS
142 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
145 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
146 //@subsection Variables and Data structures
148 /* Main thread queue.
149 * Locks required: sched_mutex.
151 StgMainThread *main_threads = NULL;
154 * Locks required: sched_mutex.
158 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
159 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
162 In GranSim we have a runnable and a blocked queue for each processor.
163 In order to minimise code changes new arrays run_queue_hds/tls
164 are created. run_queue_hd is then a short cut (macro) for
165 run_queue_hds[CurrentProc] (see GranSim.h).
168 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
169 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
170 StgTSO *ccalling_threadss[MAX_PROC];
171 /* We use the same global list of threads (all_threads) in GranSim as in
172 the std RTS (i.e. we are cheating). However, we don't use this list in
173 the GranSim specific code at the moment (so we are only potentially
178 StgTSO *run_queue_hd = NULL;
179 StgTSO *run_queue_tl = NULL;
180 StgTSO *blocked_queue_hd = NULL;
181 StgTSO *blocked_queue_tl = NULL;
182 StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */
186 /* Linked list of all threads.
187 * Used for detecting garbage collected threads.
189 StgTSO *all_threads = NULL;
191 /* When a thread performs a safe C call (_ccall_GC, using old
192 * terminology), it gets put on the suspended_ccalling_threads
193 * list. Used by the garbage collector.
195 static StgTSO *suspended_ccalling_threads;
197 static StgTSO *threadStackOverflow(StgTSO *tso);
199 /* KH: The following two flags are shared memory locations. There is no need
200 to lock them, since they are only unset at the end of a scheduler
204 /* flag set by signal handler to precipitate a context switch */
205 //@cindex context_switch
206 nat context_switch = 0;
208 /* if this flag is set as well, give up execution */
209 //@cindex interrupted
210 rtsBool interrupted = rtsFalse;
212 /* Next thread ID to allocate.
213 * Locks required: thread_id_mutex
215 //@cindex next_thread_id
216 static StgThreadID next_thread_id = 1;
219 * Pointers to the state of the current thread.
220 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
221 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
224 /* The smallest stack size that makes any sense is:
225 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
226 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
227 * + 1 (the closure to enter)
229 * + 1 (spare slot req'd by stg_ap_v_ret)
231 * A thread with this stack will bomb immediately with a stack
232 * overflow, which will increase its stack size.
235 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
242 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
243 * exists - earlier gccs apparently didn't.
248 static rtsBool ready_to_gc;
251 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
252 * in an MT setting, needed to signal that a worker thread shouldn't hang around
253 * in the scheduler when it is out of work.
255 static rtsBool shutting_down_scheduler = rtsFalse;
257 void addToBlockedQueue ( StgTSO *tso );
259 static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
260 void interruptStgRts ( void );
262 static void detectBlackHoles ( void );
265 static void sched_belch(char *s, ...);
268 #if defined(RTS_SUPPORTS_THREADS)
269 /* ToDo: carefully document the invariants that go together
270 * with these synchronisation objects.
272 Mutex sched_mutex = INIT_MUTEX_VAR;
273 Mutex term_mutex = INIT_MUTEX_VAR;
276 * A heavyweight solution to the problem of protecting
277 * the thread_id from concurrent update.
279 Mutex thread_id_mutex = INIT_MUTEX_VAR;
283 static Condition gc_pending_cond = INIT_COND_VAR;
287 #endif /* RTS_SUPPORTS_THREADS */
291 rtsTime TimeOfLastYield;
292 rtsBool emitSchedule = rtsTrue;
296 static char *whatNext_strs[] = {
306 StgTSO * createSparkThread(rtsSpark spark);
307 StgTSO * activateSpark (rtsSpark spark);
311 * The thread state for the main thread.
312 // ToDo: check whether not needed any more
316 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
317 static void taskStart(void);
325 #if defined(RTS_SUPPORTS_THREADS)
327 startSchedulerTask(void)
329 startTask(taskStart);
333 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
334 //@subsection Main scheduling loop
336 /* ---------------------------------------------------------------------------
337 Main scheduling loop.
339 We use round-robin scheduling, each thread returning to the
340 scheduler loop when one of these conditions is detected:
343 * timer expires (thread yields)
348 Locking notes: we acquire the scheduler lock once at the beginning
349 of the scheduler loop, and release it when
351 * running a thread, or
352 * waiting for work, or
353 * waiting for a GC to complete.
356 In a GranSim setup this loop iterates over the global event queue.
357 This revolves around the global event queue, which determines what
358 to do next. Therefore, it's more complicated than either the
359 concurrent or the parallel (GUM) setup.
362 GUM iterates over incoming messages.
363 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
364 and sends out a fish whenever it has nothing to do; in-between
365 doing the actual reductions (shared code below) it processes the
366 incoming messages and deals with delayed operations
367 (see PendingFetches).
368 This is not the ugliest code you could imagine, but it's bloody close.
370 ------------------------------------------------------------------------ */
373 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
374 Capability *initialCapability )
377 Capability *cap = initialCapability;
378 StgThreadReturnCode ret;
386 rtsBool receivedFinish = rtsFalse;
388 nat tp_size, sp_size; // stats only
391 rtsBool was_interrupted = rtsFalse;
392 StgTSOWhatNext prev_what_next;
394 ACQUIRE_LOCK(&sched_mutex);
396 #if defined(RTS_SUPPORTS_THREADS)
397 /* in the threaded case, the capability is either passed in via the initialCapability
398 parameter, or initialized inside the scheduler loop */
401 fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n",
402 osThreadId(), osThreadId()));
404 fprintf(stderr,"### main thread: %p\n",mainThread));
406 fprintf(stderr,"### initial cap: %p\n",initialCapability));
408 /* simply initialise it in the non-threaded case */
409 grabCapability(&cap);
413 /* set up first event to get things going */
414 /* ToDo: assign costs for system setup and init MainTSO ! */
415 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
417 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
420 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
421 G_TSO(CurrentTSO, 5));
423 if (RtsFlags.GranFlags.Light) {
424 /* Save current time; GranSim Light only */
425 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
428 event = get_next_event();
430 while (event!=(rtsEvent*)NULL) {
431 /* Choose the processor with the next event */
432 CurrentProc = event->proc;
433 CurrentTSO = event->tso;
437 while (!receivedFinish) { /* set by processMessages */
438 /* when receiving PP_FINISH message */
445 IF_DEBUG(scheduler, printAllThreads());
447 #if defined(RTS_SUPPORTS_THREADS)
448 /* Check to see whether there are any worker threads
449 waiting to deposit external call results. If so,
450 yield our capability... if we have a capability, that is. */
452 yieldToReturningWorker(&sched_mutex, &cap,
453 mainThread ? &mainThread->bound_thread_cond : NULL);
455 /* If we do not currently hold a capability, we wait for one */
458 waitForWorkCapability(&sched_mutex, &cap,
459 mainThread ? &mainThread->bound_thread_cond : NULL);
460 IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap",
465 /* If we're interrupted (the user pressed ^C, or some other
466 * termination condition occurred), kill all the currently running
470 IF_DEBUG(scheduler, sched_belch("interrupted"));
471 interrupted = rtsFalse;
472 was_interrupted = rtsTrue;
473 #if defined(RTS_SUPPORTS_THREADS)
474 // In the threaded RTS, deadlock detection doesn't work,
475 // so just exit right away.
476 prog_belch("interrupted");
477 releaseCapability(cap);
478 startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
479 RELEASE_LOCK(&sched_mutex);
480 shutdownHaskellAndExit(EXIT_SUCCESS);
486 /* Go through the list of main threads and wake up any
487 * clients whose computations have finished. ToDo: this
488 * should be done more efficiently without a linear scan
489 * of the main threads list, somehow...
491 #if defined(RTS_SUPPORTS_THREADS)
493 StgMainThread *m, **prev;
494 prev = &main_threads;
495 for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
496 if (m->tso->what_next == ThreadComplete
497 || m->tso->what_next == ThreadKilled)
501 if(m->tso->what_next == ThreadComplete)
505 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
506 *(m->ret) = (StgClosure *)m->tso->sp[1];
518 m->stat = Interrupted;
528 removeThreadLabel((StgWord)m->tso);
530 releaseCapability(cap);
531 RELEASE_LOCK(&sched_mutex);
536 // The current OS thread can not handle the fact that the Haskell
537 // thread "m" has ended.
538 // "m" is bound; the scheduler loop in it's bound OS thread has
539 // to return, so let's pass our capability directly to that thread.
540 passCapability(&sched_mutex, cap, &m->bound_thread_cond);
547 if(!cap) // If we gave our capability away,
548 continue; // go to the top to get it back
550 #else /* not threaded */
553 /* in GUM do this only on the Main PE */
556 /* If our main thread has finished or been killed, return.
559 StgMainThread *m = main_threads;
560 if (m->tso->what_next == ThreadComplete
561 || m->tso->what_next == ThreadKilled) {
563 removeThreadLabel((StgWord)m->tso);
565 main_threads = main_threads->link;
566 if (m->tso->what_next == ThreadComplete) {
567 // We finished successfully, fill in the return value
568 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
569 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
573 if (m->ret) { *(m->ret) = NULL; };
574 if (was_interrupted) {
575 m->stat = Interrupted;
585 /* Top up the run queue from our spark pool. We try to make the
586 * number of threads in the run queue equal to the number of
589 * Disable spark support in SMP for now, non-essential & requires
590 * a little bit of work to make it compile cleanly. -- sof 1/02.
592 #if 0 /* defined(SMP) */
594 nat n = getFreeCapabilities();
595 StgTSO *tso = run_queue_hd;
597 /* Count the run queue */
598 while (n > 0 && tso != END_TSO_QUEUE) {
605 spark = findSpark(rtsFalse);
607 break; /* no more sparks in the pool */
609 /* I'd prefer this to be done in activateSpark -- HWL */
610 /* tricky - it needs to hold the scheduler lock and
611 * not try to re-acquire it -- SDM */
612 createSparkThread(spark);
614 sched_belch("==^^ turning spark of closure %p into a thread",
615 (StgClosure *)spark));
618 /* We need to wake up the other tasks if we just created some
621 if (getFreeCapabilities() - n > 1) {
622 signalCondition( &thread_ready_cond );
627 /* check for signals each time around the scheduler */
628 #if defined(RTS_USER_SIGNALS)
629 if (signals_pending()) {
630 RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
631 startSignalHandlers();
632 ACQUIRE_LOCK(&sched_mutex);
636 /* Check whether any waiting threads need to be woken up. If the
637 * run queue is empty, and there are no other tasks running, we
638 * can wait indefinitely for something to happen.
640 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
641 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
646 awaitEvent( EMPTY_RUN_QUEUE()
648 && allFreeCapabilities()
652 /* we can be interrupted while waiting for I/O... */
653 if (interrupted) continue;
656 * Detect deadlock: when we have no threads to run, there are no
657 * threads waiting on I/O or sleeping, and all the other tasks are
658 * waiting for work, we must have a deadlock of some description.
660 * We first try to find threads blocked on themselves (ie. black
661 * holes), and generate NonTermination exceptions where necessary.
663 * If no threads are black holed, we have a deadlock situation, so
664 * inform all the main threads.
666 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
667 if ( EMPTY_THREAD_QUEUES()
668 #if defined(RTS_SUPPORTS_THREADS)
669 && EMPTY_QUEUE(suspended_ccalling_threads)
672 && allFreeCapabilities()
676 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
677 #if defined(THREADED_RTS)
678 /* and SMP mode ..? */
679 releaseCapability(cap);
681 // Garbage collection can release some new threads due to
682 // either (a) finalizers or (b) threads resurrected because
683 // they are about to be send BlockedOnDeadMVar. Any threads
684 // thus released will be immediately runnable.
685 GarbageCollect(GetRoots,rtsTrue);
687 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
690 sched_belch("still deadlocked, checking for black holes..."));
693 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
695 #if defined(RTS_USER_SIGNALS)
696 /* If we have user-installed signal handlers, then wait
697 * for signals to arrive rather then bombing out with a
700 #if defined(RTS_SUPPORTS_THREADS)
701 if ( 0 ) { /* hmm..what to do? Simply stop waiting for
702 a signal with no runnable threads (or I/O
703 suspended ones) leads nowhere quick.
704 For now, simply shut down when we reach this
707 ToDo: define precisely under what conditions
708 the Scheduler should shut down in an MT setting.
711 if ( anyUserHandlers() ) {
714 sched_belch("still deadlocked, waiting for signals..."));
718 // we might be interrupted...
719 if (interrupted) { continue; }
721 if (signals_pending()) {
722 RELEASE_LOCK(&sched_mutex);
723 startSignalHandlers();
724 ACQUIRE_LOCK(&sched_mutex);
726 ASSERT(!EMPTY_RUN_QUEUE());
731 /* Probably a real deadlock. Send the current main thread the
732 * Deadlock exception (or in the SMP build, send *all* main
733 * threads the deadlock exception, since none of them can make
738 #if defined(RTS_SUPPORTS_THREADS)
739 for (m = main_threads; m != NULL; m = m->link) {
740 switch (m->tso->why_blocked) {
741 case BlockedOnBlackHole:
742 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
744 case BlockedOnException:
746 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
749 barf("deadlock: main thread blocked in a strange way");
754 switch (m->tso->why_blocked) {
755 case BlockedOnBlackHole:
756 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
758 case BlockedOnException:
760 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
763 barf("deadlock: main thread blocked in a strange way");
768 #if defined(RTS_SUPPORTS_THREADS)
769 /* ToDo: revisit conditions (and mechanism) for shutting
770 down a multi-threaded world */
771 IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
772 RELEASE_LOCK(&sched_mutex);
779 #elif defined(RTS_SUPPORTS_THREADS)
780 /* ToDo: add deadlock detection in threaded RTS */
782 /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
786 /* If there's a GC pending, don't do anything until it has
790 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
791 waitCondition( &gc_pending_cond, &sched_mutex );
795 #if defined(RTS_SUPPORTS_THREADS)
797 /* block until we've got a thread on the run queue and a free
801 if ( EMPTY_RUN_QUEUE() ) {
802 /* Give up our capability */
803 releaseCapability(cap);
805 /* If we're in the process of shutting down (& running the
806 * a batch of finalisers), don't wait around.
808 if ( shutting_down_scheduler ) {
809 RELEASE_LOCK(&sched_mutex);
812 IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
813 waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
814 IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
817 if ( EMPTY_RUN_QUEUE() ) {
818 continue; // nothing to do
824 if (RtsFlags.GranFlags.Light)
825 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
827 /* adjust time based on time-stamp */
828 if (event->time > CurrentTime[CurrentProc] &&
829 event->evttype != ContinueThread)
830 CurrentTime[CurrentProc] = event->time;
832 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
833 if (!RtsFlags.GranFlags.Light)
836 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
838 /* main event dispatcher in GranSim */
839 switch (event->evttype) {
840 /* Should just be continuing execution */
842 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
843 /* ToDo: check assertion
844 ASSERT(run_queue_hd != (StgTSO*)NULL &&
845 run_queue_hd != END_TSO_QUEUE);
847 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
848 if (!RtsFlags.GranFlags.DoAsyncFetch &&
849 procStatus[CurrentProc]==Fetching) {
850 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
851 CurrentTSO->id, CurrentTSO, CurrentProc);
854 /* Ignore ContinueThreads for completed threads */
855 if (CurrentTSO->what_next == ThreadComplete) {
856 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
857 CurrentTSO->id, CurrentTSO, CurrentProc);
860 /* Ignore ContinueThreads for threads that are being migrated */
861 if (PROCS(CurrentTSO)==Nowhere) {
862 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
863 CurrentTSO->id, CurrentTSO, CurrentProc);
866 /* The thread should be at the beginning of the run queue */
867 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
868 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
869 CurrentTSO->id, CurrentTSO, CurrentProc);
870 break; // run the thread anyway
873 new_event(proc, proc, CurrentTime[proc],
875 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
877 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
878 break; // now actually run the thread; DaH Qu'vam yImuHbej
881 do_the_fetchnode(event);
882 goto next_thread; /* handle next event in event queue */
885 do_the_globalblock(event);
886 goto next_thread; /* handle next event in event queue */
889 do_the_fetchreply(event);
890 goto next_thread; /* handle next event in event queue */
892 case UnblockThread: /* Move from the blocked queue to the tail of */
893 do_the_unblock(event);
894 goto next_thread; /* handle next event in event queue */
896 case ResumeThread: /* Move from the blocked queue to the tail of */
897 /* the runnable queue ( i.e. Qu' SImqa'lu') */
898 event->tso->gran.blocktime +=
899 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
900 do_the_startthread(event);
901 goto next_thread; /* handle next event in event queue */
904 do_the_startthread(event);
905 goto next_thread; /* handle next event in event queue */
908 do_the_movethread(event);
909 goto next_thread; /* handle next event in event queue */
912 do_the_movespark(event);
913 goto next_thread; /* handle next event in event queue */
916 do_the_findwork(event);
917 goto next_thread; /* handle next event in event queue */
920 barf("Illegal event type %u\n", event->evttype);
923 /* This point was scheduler_loop in the old RTS */
925 IF_DEBUG(gran, belch("GRAN: after main switch"));
927 TimeOfLastEvent = CurrentTime[CurrentProc];
928 TimeOfNextEvent = get_time_of_next_event();
929 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
930 // CurrentTSO = ThreadQueueHd;
932 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
935 if (RtsFlags.GranFlags.Light)
936 GranSimLight_leave_system(event, &ActiveTSO);
938 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
941 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
943 /* in a GranSim setup the TSO stays on the run queue */
945 /* Take a thread from the run queue. */
946 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
949 fprintf(stderr, "GRAN: About to run current thread, which is\n");
952 context_switch = 0; // turned on via GranYield, checking events and time slice
955 DumpGranEvent(GR_SCHEDULE, t));
957 procStatus[CurrentProc] = Busy;
960 if (PendingFetches != END_BF_QUEUE) {
964 /* ToDo: phps merge with spark activation above */
965 /* check whether we have local work and send requests if we have none */
966 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
967 /* :-[ no local threads => look out for local sparks */
968 /* the spark pool for the current PE */
969 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
970 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
971 pool->hd < pool->tl) {
973 * ToDo: add GC code check that we really have enough heap afterwards!!
975 * If we're here (no runnable threads) and we have pending
976 * sparks, we must have a space problem. Get enough space
977 * to turn one of those pending sparks into a
981 spark = findSpark(rtsFalse); /* get a spark */
982 if (spark != (rtsSpark) NULL) {
983 tso = activateSpark(spark); /* turn the spark into a thread */
984 IF_PAR_DEBUG(schedule,
985 belch("==== schedule: Created TSO %d (%p); %d threads active",
986 tso->id, tso, advisory_thread_count));
988 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
989 belch("==^^ failed to activate spark");
991 } /* otherwise fall through & pick-up new tso */
993 IF_PAR_DEBUG(verbose,
994 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
995 spark_queue_len(pool)));
1000 /* If we still have no work we need to send a FISH to get a spark
1003 if (EMPTY_RUN_QUEUE()) {
1004 /* =8-[ no local sparks => look for work on other PEs */
1006 * We really have absolutely no work. Send out a fish
1007 * (there may be some out there already), and wait for
1008 * something to arrive. We clearly can't run any threads
1009 * until a SCHEDULE or RESUME arrives, and so that's what
1010 * we're hoping to see. (Of course, we still have to
1011 * respond to other types of messages.)
1013 TIME now = msTime() /*CURRENT_TIME*/;
1014 IF_PAR_DEBUG(verbose,
1015 belch("-- now=%ld", now));
1016 IF_PAR_DEBUG(verbose,
1017 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1018 (last_fish_arrived_at!=0 &&
1019 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
1020 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
1021 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
1022 last_fish_arrived_at,
1023 RtsFlags.ParFlags.fishDelay, now);
1026 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1027 (last_fish_arrived_at==0 ||
1028 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
1029 /* outstandingFishes is set in sendFish, processFish;
1030 avoid flooding system with fishes via delay */
1032 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
1035 // Global statistics: count no. of fishes
1036 if (RtsFlags.ParFlags.ParStats.Global &&
1037 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1038 globalParStats.tot_fish_mess++;
1042 receivedFinish = processMessages();
1045 } else if (PacketsWaiting()) { /* Look for incoming messages */
1046 receivedFinish = processMessages();
1049 /* Now we are sure that we have some work available */
1050 ASSERT(run_queue_hd != END_TSO_QUEUE);
1052 /* Take a thread from the run queue, if we have work */
1053 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
1054 IF_DEBUG(sanity,checkTSO(t));
1056 /* ToDo: write something to the log-file
1057 if (RTSflags.ParFlags.granSimStats && !sameThread)
1058 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1062 /* the spark pool for the current PE */
1063 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1066 belch("--=^ %d threads, %d sparks on [%#x]",
1067 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1070 if (0 && RtsFlags.ParFlags.ParStats.Full &&
1071 t && LastTSO && t->id != LastTSO->id &&
1072 LastTSO->why_blocked == NotBlocked &&
1073 LastTSO->what_next != ThreadComplete) {
1074 // if previously scheduled TSO not blocked we have to record the context switch
1075 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1076 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1079 if (RtsFlags.ParFlags.ParStats.Full &&
1080 (emitSchedule /* forced emit */ ||
1081 (t && LastTSO && t->id != LastTSO->id))) {
1083 we are running a different TSO, so write a schedule event to log file
1084 NB: If we use fair scheduling we also have to write a deschedule
1085 event for LastTSO; with unfair scheduling we know that the
1086 previous tso has blocked whenever we switch to another tso, so
1087 we don't need it in GUM for now
1089 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1090 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1091 emitSchedule = rtsFalse;
1095 #else /* !GRAN && !PAR */
1097 /* grab a thread from the run queue */
1098 ASSERT(run_queue_hd != END_TSO_QUEUE);
1099 t = POP_RUN_QUEUE();
1100 // Sanity check the thread we're about to run. This can be
1101 // expensive if there is lots of thread switching going on...
1102 IF_DEBUG(sanity,checkTSO(t));
1108 for(m = main_threads; m; m = m->link)
1119 fprintf(stderr,"### Running TSO %p in bound OS thread %u\n",
1121 // yes, the Haskell thread is bound to the current native thread
1126 fprintf(stderr,"### TSO %p bound to other OS thread than %u\n",
1128 // no, bound to a different Haskell thread: pass to that thread
1129 PUSH_ON_RUN_QUEUE(t);
1130 passCapability(&sched_mutex,cap,&m->bound_thread_cond);
1137 // The thread we want to run is not bound.
1138 if(mainThread == NULL)
1141 fprintf(stderr,"### Running TSO %p in worker OS thread %u\n",
1143 // if we are a worker thread,
1144 // we may run it here
1149 fprintf(stderr,"### TSO %p is not appropriate for main thread %p in OS thread %u\n",
1150 t, mainThread, osThreadId()));
1151 // no, the current native thread is bound to a different
1152 // Haskell thread, so pass it to any worker thread
1153 PUSH_ON_RUN_QUEUE(t);
1154 releaseCapability(cap);
1162 cap->r.rCurrentTSO = t;
1164 /* context switches are now initiated by the timer signal, unless
1165 * the user specified "context switch as often as possible", with
1168 if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1169 && (run_queue_hd != END_TSO_QUEUE
1170 || blocked_queue_hd != END_TSO_QUEUE
1171 || sleeping_queue != END_TSO_QUEUE)))
1178 RELEASE_LOCK(&sched_mutex);
1180 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
1181 t->id, whatNext_strs[t->what_next]));
1184 startHeapProfTimer();
1187 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1188 /* Run the current thread
1190 prev_what_next = t->what_next;
1191 switch (prev_what_next) {
1193 case ThreadComplete:
1194 /* Thread already finished, return to scheduler. */
1195 ret = ThreadFinished;
1198 errno = t->saved_errno;
1199 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1200 t->saved_errno = errno;
1202 case ThreadInterpret:
1203 ret = interpretBCO(cap);
1206 barf("schedule: invalid what_next field");
1208 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1210 /* Costs for the scheduler are assigned to CCS_SYSTEM */
1212 stopHeapProfTimer();
1216 ACQUIRE_LOCK(&sched_mutex);
1218 #ifdef RTS_SUPPORTS_THREADS
1219 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %p): ", osThreadId()););
1220 #elif !defined(GRAN) && !defined(PAR)
1221 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1223 t = cap->r.rCurrentTSO;
1226 /* HACK 675: if the last thread didn't yield, make sure to print a
1227 SCHEDULE event to the log file when StgRunning the next thread, even
1228 if it is the same one as before */
1230 TimeOfLastYield = CURRENT_TIME;
1236 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1237 globalGranStats.tot_heapover++;
1239 globalParStats.tot_heapover++;
1242 // did the task ask for a large block?
1243 if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1244 // if so, get one and push it on the front of the nursery.
1248 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1250 IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)",
1251 t->id, whatNext_strs[t->what_next], blocks));
1253 // don't do this if it would push us over the
1254 // alloc_blocks_lim limit; we'll GC first.
1255 if (alloc_blocks + blocks < alloc_blocks_lim) {
1257 alloc_blocks += blocks;
1258 bd = allocGroup( blocks );
1260 // link the new group into the list
1261 bd->link = cap->r.rCurrentNursery;
1262 bd->u.back = cap->r.rCurrentNursery->u.back;
1263 if (cap->r.rCurrentNursery->u.back != NULL) {
1264 cap->r.rCurrentNursery->u.back->link = bd;
1266 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1267 g0s0->blocks == cap->r.rNursery);
1268 cap->r.rNursery = g0s0->blocks = bd;
1270 cap->r.rCurrentNursery->u.back = bd;
1272 // initialise it as a nursery block. We initialise the
1273 // step, gen_no, and flags field of *every* sub-block in
1274 // this large block, because this is easier than making
1275 // sure that we always find the block head of a large
1276 // block whenever we call Bdescr() (eg. evacuate() and
1277 // isAlive() in the GC would both have to do this, at
1281 for (x = bd; x < bd + blocks; x++) {
1288 // don't forget to update the block count in g0s0.
1289 g0s0->n_blocks += blocks;
1290 // This assert can be a killer if the app is doing lots
1291 // of large block allocations.
1292 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1294 // now update the nursery to point to the new block
1295 cap->r.rCurrentNursery = bd;
1297 // we might be unlucky and have another thread get on the
1298 // run queue before us and steal the large block, but in that
1299 // case the thread will just end up requesting another large
1301 PUSH_ON_RUN_QUEUE(t);
1306 /* make all the running tasks block on a condition variable,
1307 * maybe set context_switch and wait till they all pile in,
1308 * then have them wait on a GC condition variable.
1310 IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow",
1311 t->id, whatNext_strs[t->what_next]));
1314 ASSERT(!is_on_queue(t,CurrentProc));
1316 /* Currently we emit a DESCHEDULE event before GC in GUM.
1317 ToDo: either add separate event to distinguish SYSTEM time from rest
1318 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1319 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1320 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1321 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1322 emitSchedule = rtsTrue;
1326 ready_to_gc = rtsTrue;
1327 context_switch = 1; /* stop other threads ASAP */
1328 PUSH_ON_RUN_QUEUE(t);
1329 /* actual GC is done at the end of the while loop */
1335 DumpGranEvent(GR_DESCHEDULE, t));
1336 globalGranStats.tot_stackover++;
1339 // DumpGranEvent(GR_DESCHEDULE, t);
1340 globalParStats.tot_stackover++;
1342 IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow",
1343 t->id, whatNext_strs[t->what_next]));
1344 /* just adjust the stack for this thread, then pop it back
1350 /* enlarge the stack */
1351 StgTSO *new_t = threadStackOverflow(t);
1353 /* This TSO has moved, so update any pointers to it from the
1354 * main thread stack. It better not be on any other queues...
1355 * (it shouldn't be).
1357 for (m = main_threads; m != NULL; m = m->link) {
1362 threadPaused(new_t);
1363 PUSH_ON_RUN_QUEUE(new_t);
1367 case ThreadYielding:
1370 DumpGranEvent(GR_DESCHEDULE, t));
1371 globalGranStats.tot_yields++;
1374 // DumpGranEvent(GR_DESCHEDULE, t);
1375 globalParStats.tot_yields++;
1377 /* put the thread back on the run queue. Then, if we're ready to
1378 * GC, check whether this is the last task to stop. If so, wake
1379 * up the GC thread. getThread will block during a GC until the
1383 if (t->what_next != prev_what_next) {
1384 belch("--<< thread %ld (%s) stopped to switch evaluators",
1385 t->id, whatNext_strs[t->what_next]);
1387 belch("--<< thread %ld (%s) stopped, yielding",
1388 t->id, whatNext_strs[t->what_next]);
1393 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1395 ASSERT(t->link == END_TSO_QUEUE);
1397 // Shortcut if we're just switching evaluators: don't bother
1398 // doing stack squeezing (which can be expensive), just run the
1400 if (t->what_next != prev_what_next) {
1407 ASSERT(!is_on_queue(t,CurrentProc));
1410 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1411 checkThreadQsSanity(rtsTrue));
1415 if (RtsFlags.ParFlags.doFairScheduling) {
1416 /* this does round-robin scheduling; good for concurrency */
1417 APPEND_TO_RUN_QUEUE(t);
1419 /* this does unfair scheduling; good for parallelism */
1420 PUSH_ON_RUN_QUEUE(t);
1423 // this does round-robin scheduling; good for concurrency
1424 APPEND_TO_RUN_QUEUE(t);
1428 /* add a ContinueThread event to actually process the thread */
1429 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1431 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1433 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1442 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1443 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1444 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1446 // ??? needed; should emit block before
1448 DumpGranEvent(GR_DESCHEDULE, t));
1449 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1452 ASSERT(procStatus[CurrentProc]==Busy ||
1453 ((procStatus[CurrentProc]==Fetching) &&
1454 (t->block_info.closure!=(StgClosure*)NULL)));
1455 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1456 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1457 procStatus[CurrentProc]==Fetching))
1458 procStatus[CurrentProc] = Idle;
1462 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1463 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1466 if (t->block_info.closure!=(StgClosure*)NULL)
1467 print_bq(t->block_info.closure));
1469 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1472 /* whatever we schedule next, we must log that schedule */
1473 emitSchedule = rtsTrue;
1476 /* don't need to do anything. Either the thread is blocked on
1477 * I/O, in which case we'll have called addToBlockedQueue
1478 * previously, or it's blocked on an MVar or Blackhole, in which
1479 * case it'll be on the relevant queue already.
1482 fprintf(stderr, "--<< thread %d (%s) stopped: ",
1483 t->id, whatNext_strs[t->what_next]);
1484 printThreadBlockage(t);
1485 fprintf(stderr, "\n"));
1487 /* Only for dumping event to log file
1488 ToDo: do I need this in GranSim, too?
1495 case ThreadFinished:
1496 /* Need to check whether this was a main thread, and if so, signal
1497 * the task that started it with the return value. If we have no
1498 * more main threads, we probably need to stop all the tasks until
1501 /* We also end up here if the thread kills itself with an
1502 * uncaught exception, see Exception.hc.
1504 IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished",
1505 t->id, whatNext_strs[t->what_next]));
1507 endThread(t, CurrentProc); // clean-up the thread
1509 /* For now all are advisory -- HWL */
1510 //if(t->priority==AdvisoryPriority) ??
1511 advisory_thread_count--;
1514 if(t->dist.priority==RevalPriority)
1518 if (RtsFlags.ParFlags.ParStats.Full &&
1519 !RtsFlags.ParFlags.ParStats.Suppressed)
1520 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1525 barf("schedule: invalid thread return code %d", (int)ret);
1529 // When we have +RTS -i0 and we're heap profiling, do a census at
1530 // every GC. This lets us get repeatable runs for debugging.
1531 if (performHeapProfile ||
1532 (RtsFlags.ProfFlags.profileInterval==0 &&
1533 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1534 GarbageCollect(GetRoots, rtsTrue);
1536 performHeapProfile = rtsFalse;
1537 ready_to_gc = rtsFalse; // we already GC'd
1543 && allFreeCapabilities()
1546 /* everybody back, start the GC.
1547 * Could do it in this thread, or signal a condition var
1548 * to do it in another thread. Either way, we need to
1549 * broadcast on gc_pending_cond afterward.
1551 #if defined(RTS_SUPPORTS_THREADS)
1552 IF_DEBUG(scheduler,sched_belch("doing GC"));
1554 GarbageCollect(GetRoots,rtsFalse);
1555 ready_to_gc = rtsFalse;
1557 broadcastCondition(&gc_pending_cond);
1560 /* add a ContinueThread event to continue execution of current thread */
1561 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1563 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1565 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1573 IF_GRAN_DEBUG(unused,
1574 print_eventq(EventHd));
1576 event = get_next_event();
1579 /* ToDo: wait for next message to arrive rather than busy wait */
1582 } /* end of while(1) */
1584 IF_PAR_DEBUG(verbose,
1585 belch("== Leaving schedule() after having received Finish"));
1588 /* ---------------------------------------------------------------------------
1589 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1590 * used by Control.Concurrent for error checking.
1591 * ------------------------------------------------------------------------- */
1594 rtsSupportsBoundThreads(void)
1603 /* ---------------------------------------------------------------------------
1604 * isThreadBound(tso): check whether tso is bound to an OS thread.
1605 * ------------------------------------------------------------------------- */
1608 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1612 for(m = main_threads; m; m = m->link)
1621 /* ---------------------------------------------------------------------------
1622 * Singleton fork(). Do not copy any running threads.
1623 * ------------------------------------------------------------------------- */
1627 deleteThreadImmediately(StgTSO *tso);
1631 forkProcess(StgTSO* tso)
1633 #ifndef mingw32_TARGET_OS
1637 IF_DEBUG(scheduler,sched_belch("forking!"));
1638 ACQUIRE_LOCK(&sched_mutex);
1641 if (pid) { /* parent */
1643 /* just return the pid */
1645 } else { /* child */
1647 /* wipe all other threads */
1648 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1649 tso->link = END_TSO_QUEUE;
1651 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1654 /* Don't kill the current thread.. */
1655 if (t->id == tso->id) {
1659 if (isThreadBound(t)) {
1660 // If the thread is bound, the OS thread that the thread is bound to
1661 // no longer exists after the fork() system call.
1662 // The bound Haskell thread is therefore unable to run at all;
1663 // we must not give it a chance to survive by catching the
1664 // ThreadKilled exception. So we kill it "brutally" rather than
1665 // using deleteThread.
1666 deleteThreadImmediately(t);
1672 if (isThreadBound(tso)) {
1674 // If the current is not bound, then we should make it so.
1675 // The OS thread left over by fork() is special in that the process
1676 // will terminate as soon as the thread terminates;
1677 // we'd expect forkProcess to behave similarily.
1678 // FIXME - we don't do this.
1683 /* wipe all other threads */
1684 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1685 tso->link = END_TSO_QUEUE;
1687 /* When clearing out the threads, we need to ensure
1688 that a 'main thread' is left behind; if there isn't,
1689 the Scheduler will shutdown next time it is entered.
1691 ==> we don't kill a thread that's on the main_threads
1692 list (nor the current thread.)
1694 [ Attempts at implementing the more ambitious scheme of
1695 killing the main_threads also, and then adding the
1696 current thread onto the main_threads list if it wasn't
1697 there already, failed -- waitThread() (for one) wasn't
1698 up to it. If it proves to be desirable to also kill
1699 the main threads, then this scheme will have to be
1700 revisited (and fully debugged!)
1705 /* DO NOT TOUCH THE QUEUES directly because most of the code around
1706 us is picky about finding the thread still in its queue when
1707 handling the deleteThread() */
1709 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1712 /* Don't kill the current thread.. */
1713 if (t->id == tso->id) continue;
1715 /* ..or a main thread */
1716 for (m = main_threads; m != NULL; m = m->link) {
1717 if (m->tso->id == t->id) {
1728 RELEASE_LOCK(&sched_mutex);
1731 barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1732 /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1734 #endif /* mingw32 */
1737 /* ---------------------------------------------------------------------------
1738 * deleteAllThreads(): kill all the live threads.
1740 * This is used when we catch a user interrupt (^C), before performing
1741 * any necessary cleanups and running finalizers.
1743 * Locks: sched_mutex held.
1744 * ------------------------------------------------------------------------- */
1747 deleteAllThreads ( void )
1750 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1751 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1752 next = t->global_link;
1755 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1756 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1757 sleeping_queue = END_TSO_QUEUE;
1760 /* startThread and insertThread are now in GranSim.c -- HWL */
1763 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1764 //@subsection Suspend and Resume
1766 /* ---------------------------------------------------------------------------
1767 * Suspending & resuming Haskell threads.
1769 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1770 * its capability before calling the C function. This allows another
1771 * task to pick up the capability and carry on running Haskell
1772 * threads. It also means that if the C call blocks, it won't lock
1775 * The Haskell thread making the C call is put to sleep for the
1776 * duration of the call, on the susepended_ccalling_threads queue. We
1777 * give out a token to the task, which it can use to resume the thread
1778 * on return from the C function.
1779 * ------------------------------------------------------------------------- */
1782 suspendThread( StgRegTable *reg,
1784 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1791 int saved_errno = errno;
1793 /* assume that *reg is a pointer to the StgRegTable part
1796 cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1798 ACQUIRE_LOCK(&sched_mutex);
1801 sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1803 // XXX this might not be necessary --SDM
1804 cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1806 threadPaused(cap->r.rCurrentTSO);
1807 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1808 suspended_ccalling_threads = cap->r.rCurrentTSO;
1810 #if defined(RTS_SUPPORTS_THREADS)
1811 if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1813 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1814 cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1818 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1822 /* Use the thread ID as the token; it should be unique */
1823 tok = cap->r.rCurrentTSO->id;
1825 /* Hand back capability */
1826 releaseCapability(cap);
1828 #if defined(RTS_SUPPORTS_THREADS)
1829 /* Preparing to leave the RTS, so ensure there's a native thread/task
1830 waiting to take over.
1832 IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
1833 //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult
1834 startTask(taskStart);
1838 /* Other threads _might_ be available for execution; signal this */
1840 RELEASE_LOCK(&sched_mutex);
1842 errno = saved_errno;
1847 resumeThread( StgInt tok,
1848 rtsBool concCall STG_UNUSED )
1850 StgTSO *tso, **prev;
1852 int saved_errno = errno;
1854 #if defined(RTS_SUPPORTS_THREADS)
1855 /* Wait for permission to re-enter the RTS with the result. */
1856 ACQUIRE_LOCK(&sched_mutex);
1857 grabReturnCapability(&sched_mutex, &cap);
1859 IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId()));
1861 grabCapability(&cap);
1864 /* Remove the thread off of the suspended list */
1865 prev = &suspended_ccalling_threads;
1866 for (tso = suspended_ccalling_threads;
1867 tso != END_TSO_QUEUE;
1868 prev = &tso->link, tso = tso->link) {
1869 if (tso->id == (StgThreadID)tok) {
1874 if (tso == END_TSO_QUEUE) {
1875 barf("resumeThread: thread not found");
1877 tso->link = END_TSO_QUEUE;
1879 #if defined(RTS_SUPPORTS_THREADS)
1880 if(tso->why_blocked == BlockedOnCCall)
1882 awakenBlockedQueueNoLock(tso->blocked_exceptions);
1883 tso->blocked_exceptions = NULL;
1887 /* Reset blocking status */
1888 tso->why_blocked = NotBlocked;
1890 cap->r.rCurrentTSO = tso;
1891 #if defined(RTS_SUPPORTS_THREADS)
1892 RELEASE_LOCK(&sched_mutex);
1894 errno = saved_errno;
1899 /* ---------------------------------------------------------------------------
1901 * ------------------------------------------------------------------------ */
1902 static void unblockThread(StgTSO *tso);
1904 /* ---------------------------------------------------------------------------
1905 * Comparing Thread ids.
1907 * This is used from STG land in the implementation of the
1908 * instances of Eq/Ord for ThreadIds.
1909 * ------------------------------------------------------------------------ */
1912 cmp_thread(StgPtr tso1, StgPtr tso2)
1914 StgThreadID id1 = ((StgTSO *)tso1)->id;
1915 StgThreadID id2 = ((StgTSO *)tso2)->id;
1917 if (id1 < id2) return (-1);
1918 if (id1 > id2) return 1;
1922 /* ---------------------------------------------------------------------------
1923 * Fetching the ThreadID from an StgTSO.
1925 * This is used in the implementation of Show for ThreadIds.
1926 * ------------------------------------------------------------------------ */
1928 rts_getThreadId(StgPtr tso)
1930 return ((StgTSO *)tso)->id;
1935 labelThread(StgPtr tso, char *label)
1940 /* Caveat: Once set, you can only set the thread name to "" */
1941 len = strlen(label)+1;
1942 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1943 strncpy(buf,label,len);
1944 /* Update will free the old memory for us */
1945 updateThreadLabel((StgWord)tso,buf);
1949 /* ---------------------------------------------------------------------------
1950 Create a new thread.
1952 The new thread starts with the given stack size. Before the
1953 scheduler can run, however, this thread needs to have a closure
1954 (and possibly some arguments) pushed on its stack. See
1955 pushClosure() in Schedule.h.
1957 createGenThread() and createIOThread() (in SchedAPI.h) are
1958 convenient packaged versions of this function.
1960 currently pri (priority) is only used in a GRAN setup -- HWL
1961 ------------------------------------------------------------------------ */
1962 //@cindex createThread
1964 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1966 createThread(nat size, StgInt pri)
1969 createThread(nat size)
1976 /* First check whether we should create a thread at all */
1978 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1979 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1981 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1982 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1983 return END_TSO_QUEUE;
1989 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1992 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1994 /* catch ridiculously small stack sizes */
1995 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1996 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1999 stack_size = size - TSO_STRUCT_SIZEW;
2001 tso = (StgTSO *)allocate(size);
2002 TICK_ALLOC_TSO(stack_size, 0);
2004 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2006 SET_GRAN_HDR(tso, ThisPE);
2009 // Always start with the compiled code evaluator
2010 tso->what_next = ThreadRunGHC;
2012 /* tso->id needs to be unique. For now we use a heavyweight mutex to
2013 * protect the increment operation on next_thread_id.
2014 * In future, we could use an atomic increment instead.
2016 ACQUIRE_LOCK(&thread_id_mutex);
2017 tso->id = next_thread_id++;
2018 RELEASE_LOCK(&thread_id_mutex);
2020 tso->why_blocked = NotBlocked;
2021 tso->blocked_exceptions = NULL;
2023 tso->saved_errno = 0;
2025 tso->stack_size = stack_size;
2026 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
2028 tso->sp = (P_)&(tso->stack) + stack_size;
2031 tso->prof.CCCS = CCS_MAIN;
2034 /* put a stop frame on the stack */
2035 tso->sp -= sizeofW(StgStopFrame);
2036 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2039 tso->link = END_TSO_QUEUE;
2040 /* uses more flexible routine in GranSim */
2041 insertThread(tso, CurrentProc);
2043 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2049 if (RtsFlags.GranFlags.GranSimStats.Full)
2050 DumpGranEvent(GR_START,tso);
2052 if (RtsFlags.ParFlags.ParStats.Full)
2053 DumpGranEvent(GR_STARTQ,tso);
2054 /* HACk to avoid SCHEDULE
2058 /* Link the new thread on the global thread list.
2060 tso->global_link = all_threads;
2064 tso->dist.priority = MandatoryPriority; //by default that is...
2068 tso->gran.pri = pri;
2070 tso->gran.magic = TSO_MAGIC; // debugging only
2072 tso->gran.sparkname = 0;
2073 tso->gran.startedat = CURRENT_TIME;
2074 tso->gran.exported = 0;
2075 tso->gran.basicblocks = 0;
2076 tso->gran.allocs = 0;
2077 tso->gran.exectime = 0;
2078 tso->gran.fetchtime = 0;
2079 tso->gran.fetchcount = 0;
2080 tso->gran.blocktime = 0;
2081 tso->gran.blockcount = 0;
2082 tso->gran.blockedat = 0;
2083 tso->gran.globalsparks = 0;
2084 tso->gran.localsparks = 0;
2085 if (RtsFlags.GranFlags.Light)
2086 tso->gran.clock = Now; /* local clock */
2088 tso->gran.clock = 0;
2090 IF_DEBUG(gran,printTSO(tso));
2093 tso->par.magic = TSO_MAGIC; // debugging only
2095 tso->par.sparkname = 0;
2096 tso->par.startedat = CURRENT_TIME;
2097 tso->par.exported = 0;
2098 tso->par.basicblocks = 0;
2099 tso->par.allocs = 0;
2100 tso->par.exectime = 0;
2101 tso->par.fetchtime = 0;
2102 tso->par.fetchcount = 0;
2103 tso->par.blocktime = 0;
2104 tso->par.blockcount = 0;
2105 tso->par.blockedat = 0;
2106 tso->par.globalsparks = 0;
2107 tso->par.localsparks = 0;
2111 globalGranStats.tot_threads_created++;
2112 globalGranStats.threads_created_on_PE[CurrentProc]++;
2113 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2114 globalGranStats.tot_sq_probes++;
2116 // collect parallel global statistics (currently done together with GC stats)
2117 if (RtsFlags.ParFlags.ParStats.Global &&
2118 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2119 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
2120 globalParStats.tot_threads_created++;
2126 belch("==__ schedule: Created TSO %d (%p);",
2127 CurrentProc, tso, tso->id));
2129 IF_PAR_DEBUG(verbose,
2130 belch("==__ schedule: Created TSO %d (%p); %d threads active",
2131 tso->id, tso, advisory_thread_count));
2133 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
2134 tso->id, tso->stack_size));
2141 all parallel thread creation calls should fall through the following routine.
2144 createSparkThread(rtsSpark spark)
2146 ASSERT(spark != (rtsSpark)NULL);
2147 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
2149 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2150 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2151 return END_TSO_QUEUE;
2155 tso = createThread(RtsFlags.GcFlags.initialStkSize);
2156 if (tso==END_TSO_QUEUE)
2157 barf("createSparkThread: Cannot create TSO");
2159 tso->priority = AdvisoryPriority;
2161 pushClosure(tso,spark);
2162 PUSH_ON_RUN_QUEUE(tso);
2163 advisory_thread_count++;
2170 Turn a spark into a thread.
2171 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2174 //@cindex activateSpark
2176 activateSpark (rtsSpark spark)
2180 tso = createSparkThread(spark);
2181 if (RtsFlags.ParFlags.ParStats.Full) {
2182 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2183 IF_PAR_DEBUG(verbose,
2184 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
2185 (StgClosure *)spark, info_type((StgClosure *)spark)));
2187 // ToDo: fwd info on local/global spark to thread -- HWL
2188 // tso->gran.exported = spark->exported;
2189 // tso->gran.locked = !spark->global;
2190 // tso->gran.sparkname = spark->name;
2196 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
2197 Capability *initialCapability
2201 /* ---------------------------------------------------------------------------
2204 * scheduleThread puts a thread on the head of the runnable queue.
2205 * This will usually be done immediately after a thread is created.
2206 * The caller of scheduleThread must create the thread using e.g.
2207 * createThread and push an appropriate closure
2208 * on this thread's stack before the scheduler is invoked.
2209 * ------------------------------------------------------------------------ */
2211 static void scheduleThread_ (StgTSO* tso);
2214 scheduleThread_(StgTSO *tso)
2216 // Precondition: sched_mutex must be held.
2218 /* Put the new thread on the head of the runnable queue. The caller
2219 * better push an appropriate closure on this thread's stack
2220 * beforehand. In the SMP case, the thread may start running as
2221 * soon as we release the scheduler lock below.
2223 PUSH_ON_RUN_QUEUE(tso);
2227 IF_DEBUG(scheduler,printTSO(tso));
2231 void scheduleThread(StgTSO* tso)
2233 ACQUIRE_LOCK(&sched_mutex);
2234 scheduleThread_(tso);
2235 RELEASE_LOCK(&sched_mutex);
2239 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability)
2240 { // Precondition: sched_mutex must be held
2243 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2247 #if defined(RTS_SUPPORTS_THREADS)
2248 initCondition(&m->wakeup);
2249 #if defined(THREADED_RTS)
2250 initCondition(&m->bound_thread_cond);
2254 /* Put the thread on the main-threads list prior to scheduling the TSO.
2255 Failure to do so introduces a race condition in the MT case (as
2256 identified by Wolfgang Thaller), whereby the new task/OS thread
2257 created by scheduleThread_() would complete prior to the thread
2258 that spawned it managed to put 'itself' on the main-threads list.
2259 The upshot of it all being that the worker thread wouldn't get to
2260 signal the completion of the its work item for the main thread to
2261 see (==> it got stuck waiting.) -- sof 6/02.
2263 IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id));
2265 m->link = main_threads;
2268 scheduleThread_(tso);
2270 return waitThread_(m, initialCapability);
2273 /* ---------------------------------------------------------------------------
2276 * Initialise the scheduler. This resets all the queues - if the
2277 * queues contained any threads, they'll be garbage collected at the
2280 * ------------------------------------------------------------------------ */
2284 term_handler(int sig STG_UNUSED)
2287 ACQUIRE_LOCK(&term_mutex);
2289 RELEASE_LOCK(&term_mutex);
2300 for (i=0; i<=MAX_PROC; i++) {
2301 run_queue_hds[i] = END_TSO_QUEUE;
2302 run_queue_tls[i] = END_TSO_QUEUE;
2303 blocked_queue_hds[i] = END_TSO_QUEUE;
2304 blocked_queue_tls[i] = END_TSO_QUEUE;
2305 ccalling_threadss[i] = END_TSO_QUEUE;
2306 sleeping_queue = END_TSO_QUEUE;
2309 run_queue_hd = END_TSO_QUEUE;
2310 run_queue_tl = END_TSO_QUEUE;
2311 blocked_queue_hd = END_TSO_QUEUE;
2312 blocked_queue_tl = END_TSO_QUEUE;
2313 sleeping_queue = END_TSO_QUEUE;
2316 suspended_ccalling_threads = END_TSO_QUEUE;
2318 main_threads = NULL;
2319 all_threads = END_TSO_QUEUE;
2324 RtsFlags.ConcFlags.ctxtSwitchTicks =
2325 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2327 #if defined(RTS_SUPPORTS_THREADS)
2328 /* Initialise the mutex and condition variables used by
2330 initMutex(&sched_mutex);
2331 initMutex(&term_mutex);
2332 initMutex(&thread_id_mutex);
2334 initCondition(&thread_ready_cond);
2338 initCondition(&gc_pending_cond);
2341 #if defined(RTS_SUPPORTS_THREADS)
2342 ACQUIRE_LOCK(&sched_mutex);
2345 /* Install the SIGHUP handler */
2348 struct sigaction action,oact;
2350 action.sa_handler = term_handler;
2351 sigemptyset(&action.sa_mask);
2352 action.sa_flags = 0;
2353 if (sigaction(SIGTERM, &action, &oact) != 0) {
2354 barf("can't install TERM handler");
2359 /* A capability holds the state a native thread needs in
2360 * order to execute STG code. At least one capability is
2361 * floating around (only SMP builds have more than one).
2365 #if defined(RTS_SUPPORTS_THREADS)
2366 /* start our haskell execution tasks */
2368 startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2370 startTaskManager(0,taskStart);
2374 #if /* defined(SMP) ||*/ defined(PAR)
2378 #if defined(RTS_SUPPORTS_THREADS)
2379 RELEASE_LOCK(&sched_mutex);
2385 exitScheduler( void )
2387 #if defined(RTS_SUPPORTS_THREADS)
2390 shutting_down_scheduler = rtsTrue;
2393 /* -----------------------------------------------------------------------------
2394 Managing the per-task allocation areas.
2396 Each capability comes with an allocation area. These are
2397 fixed-length block lists into which allocation can be done.
2399 ToDo: no support for two-space collection at the moment???
2400 -------------------------------------------------------------------------- */
2402 /* -----------------------------------------------------------------------------
2403 * waitThread is the external interface for running a new computation
2404 * and waiting for the result.
2406 * In the non-SMP case, we create a new main thread, push it on the
2407 * main-thread stack, and invoke the scheduler to run it. The
2408 * scheduler will return when the top main thread on the stack has
2409 * completed or died, and fill in the necessary fields of the
2410 * main_thread structure.
2412 * In the SMP case, we create a main thread as before, but we then
2413 * create a new condition variable and sleep on it. When our new
2414 * main thread has completed, we'll be woken up and the status/result
2415 * will be in the main_thread struct.
2416 * -------------------------------------------------------------------------- */
2419 howManyThreadsAvail ( void )
2423 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2425 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2427 for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2433 finishAllThreads ( void )
2436 while (run_queue_hd != END_TSO_QUEUE) {
2437 waitThread ( run_queue_hd, NULL, NULL );
2439 while (blocked_queue_hd != END_TSO_QUEUE) {
2440 waitThread ( blocked_queue_hd, NULL, NULL );
2442 while (sleeping_queue != END_TSO_QUEUE) {
2443 waitThread ( blocked_queue_hd, NULL, NULL );
2446 (blocked_queue_hd != END_TSO_QUEUE ||
2447 run_queue_hd != END_TSO_QUEUE ||
2448 sleeping_queue != END_TSO_QUEUE);
2452 waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability)
2455 SchedulerStatus stat;
2457 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2461 #if defined(RTS_SUPPORTS_THREADS)
2462 initCondition(&m->wakeup);
2463 #if defined(THREADED_RTS)
2464 initCondition(&m->bound_thread_cond);
2468 /* see scheduleWaitThread() comment */
2469 ACQUIRE_LOCK(&sched_mutex);
2470 m->link = main_threads;
2473 IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
2475 stat = waitThread_(m,initialCapability);
2477 RELEASE_LOCK(&sched_mutex);
2483 waitThread_(StgMainThread* m, Capability *initialCapability)
2485 SchedulerStatus stat;
2487 // Precondition: sched_mutex must be held.
2488 IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2490 #if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS)
2491 { // FIXME: does this still make sense?
2492 // It's not for the threaded rts => SMP only
2494 waitCondition(&m->wakeup, &sched_mutex);
2495 } while (m->stat == NoStatus);
2498 /* GranSim specific init */
2499 CurrentTSO = m->tso; // the TSO to run
2500 procStatus[MainProc] = Busy; // status of main PE
2501 CurrentProc = MainProc; // PE to run it on
2503 RELEASE_LOCK(&sched_mutex);
2504 schedule(m,initialCapability);
2506 RELEASE_LOCK(&sched_mutex);
2507 schedule(m,initialCapability);
2508 ACQUIRE_LOCK(&sched_mutex);
2509 ASSERT(m->stat != NoStatus);
2514 #if defined(RTS_SUPPORTS_THREADS)
2515 closeCondition(&m->wakeup);
2516 #if defined(THREADED_RTS)
2517 closeCondition(&m->bound_thread_cond);
2521 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
2525 // Postcondition: sched_mutex still held
2529 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2530 //@subsection Run queue code
2534 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2535 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2536 implicit global variable that has to be correct when calling these
2540 /* Put the new thread on the head of the runnable queue.
2541 * The caller of createThread better push an appropriate closure
2542 * on this thread's stack before the scheduler is invoked.
2544 static /* inline */ void
2545 add_to_run_queue(tso)
2548 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2549 tso->link = run_queue_hd;
2551 if (run_queue_tl == END_TSO_QUEUE) {
2556 /* Put the new thread at the end of the runnable queue. */
2557 static /* inline */ void
2558 push_on_run_queue(tso)
2561 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2562 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2563 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2564 if (run_queue_hd == END_TSO_QUEUE) {
2567 run_queue_tl->link = tso;
2573 Should be inlined because it's used very often in schedule. The tso
2574 argument is actually only needed in GranSim, where we want to have the
2575 possibility to schedule *any* TSO on the run queue, irrespective of the
2576 actual ordering. Therefore, if tso is not the nil TSO then we traverse
2577 the run queue and dequeue the tso, adjusting the links in the queue.
2579 //@cindex take_off_run_queue
2580 static /* inline */ StgTSO*
2581 take_off_run_queue(StgTSO *tso) {
2585 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2587 if tso is specified, unlink that tso from the run_queue (doesn't have
2588 to be at the beginning of the queue); GranSim only
2590 if (tso!=END_TSO_QUEUE) {
2591 /* find tso in queue */
2592 for (t=run_queue_hd, prev=END_TSO_QUEUE;
2593 t!=END_TSO_QUEUE && t!=tso;
2597 /* now actually dequeue the tso */
2598 if (prev!=END_TSO_QUEUE) {
2599 ASSERT(run_queue_hd!=t);
2600 prev->link = t->link;
2602 /* t is at beginning of thread queue */
2603 ASSERT(run_queue_hd==t);
2604 run_queue_hd = t->link;
2606 /* t is at end of thread queue */
2607 if (t->link==END_TSO_QUEUE) {
2608 ASSERT(t==run_queue_tl);
2609 run_queue_tl = prev;
2611 ASSERT(run_queue_tl!=t);
2613 t->link = END_TSO_QUEUE;
2615 /* take tso from the beginning of the queue; std concurrent code */
2617 if (t != END_TSO_QUEUE) {
2618 run_queue_hd = t->link;
2619 t->link = END_TSO_QUEUE;
2620 if (run_queue_hd == END_TSO_QUEUE) {
2621 run_queue_tl = END_TSO_QUEUE;
2630 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2631 //@subsection Garbage Collextion Routines
2633 /* ---------------------------------------------------------------------------
2634 Where are the roots that we know about?
2636 - all the threads on the runnable queue
2637 - all the threads on the blocked queue
2638 - all the threads on the sleeping queue
2639 - all the thread currently executing a _ccall_GC
2640 - all the "main threads"
2642 ------------------------------------------------------------------------ */
2644 /* This has to be protected either by the scheduler monitor, or by the
2645 garbage collection monitor (probably the latter).
2650 GetRoots(evac_fn evac)
2655 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2656 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2657 evac((StgClosure **)&run_queue_hds[i]);
2658 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2659 evac((StgClosure **)&run_queue_tls[i]);
2661 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2662 evac((StgClosure **)&blocked_queue_hds[i]);
2663 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2664 evac((StgClosure **)&blocked_queue_tls[i]);
2665 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2666 evac((StgClosure **)&ccalling_threads[i]);
2673 if (run_queue_hd != END_TSO_QUEUE) {
2674 ASSERT(run_queue_tl != END_TSO_QUEUE);
2675 evac((StgClosure **)&run_queue_hd);
2676 evac((StgClosure **)&run_queue_tl);
2679 if (blocked_queue_hd != END_TSO_QUEUE) {
2680 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2681 evac((StgClosure **)&blocked_queue_hd);
2682 evac((StgClosure **)&blocked_queue_tl);
2685 if (sleeping_queue != END_TSO_QUEUE) {
2686 evac((StgClosure **)&sleeping_queue);
2690 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2691 evac((StgClosure **)&suspended_ccalling_threads);
2694 #if defined(PAR) || defined(GRAN)
2695 markSparkQueue(evac);
2698 #if defined(RTS_USER_SIGNALS)
2699 // mark the signal handlers (signals should be already blocked)
2700 markSignalHandlers(evac);
2703 // main threads which have completed need to be retained until they
2704 // are dealt with in the main scheduler loop. They won't be
2705 // retained any other way: the GC will drop them from the
2706 // all_threads list, so we have to be careful to treat them as roots
2710 for (m = main_threads; m != NULL; m = m->link) {
2711 switch (m->tso->what_next) {
2712 case ThreadComplete:
2714 evac((StgClosure **)&m->tso);
2723 /* -----------------------------------------------------------------------------
2726 This is the interface to the garbage collector from Haskell land.
2727 We provide this so that external C code can allocate and garbage
2728 collect when called from Haskell via _ccall_GC.
2730 It might be useful to provide an interface whereby the programmer
2731 can specify more roots (ToDo).
2733 This needs to be protected by the GC condition variable above. KH.
2734 -------------------------------------------------------------------------- */
2736 static void (*extra_roots)(evac_fn);
2741 /* Obligated to hold this lock upon entry */
2742 ACQUIRE_LOCK(&sched_mutex);
2743 GarbageCollect(GetRoots,rtsFalse);
2744 RELEASE_LOCK(&sched_mutex);
2748 performMajorGC(void)
2750 ACQUIRE_LOCK(&sched_mutex);
2751 GarbageCollect(GetRoots,rtsTrue);
2752 RELEASE_LOCK(&sched_mutex);
2756 AllRoots(evac_fn evac)
2758 GetRoots(evac); // the scheduler's roots
2759 extra_roots(evac); // the user's roots
2763 performGCWithRoots(void (*get_roots)(evac_fn))
2765 ACQUIRE_LOCK(&sched_mutex);
2766 extra_roots = get_roots;
2767 GarbageCollect(AllRoots,rtsFalse);
2768 RELEASE_LOCK(&sched_mutex);
2771 /* -----------------------------------------------------------------------------
2774 If the thread has reached its maximum stack size, then raise the
2775 StackOverflow exception in the offending thread. Otherwise
2776 relocate the TSO into a larger chunk of memory and adjust its stack
2778 -------------------------------------------------------------------------- */
2781 threadStackOverflow(StgTSO *tso)
2783 nat new_stack_size, new_tso_size, stack_words;
2787 IF_DEBUG(sanity,checkTSO(tso));
2788 if (tso->stack_size >= tso->max_stack_size) {
2791 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2792 tso->id, tso, tso->stack_size, tso->max_stack_size);
2793 /* If we're debugging, just print out the top of the stack */
2794 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2797 /* Send this thread the StackOverflow exception */
2798 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2802 /* Try to double the current stack size. If that takes us over the
2803 * maximum stack size for this thread, then use the maximum instead.
2804 * Finally round up so the TSO ends up as a whole number of blocks.
2806 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2807 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2808 TSO_STRUCT_SIZE)/sizeof(W_);
2809 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2810 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2812 IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2814 dest = (StgTSO *)allocate(new_tso_size);
2815 TICK_ALLOC_TSO(new_stack_size,0);
2817 /* copy the TSO block and the old stack into the new area */
2818 memcpy(dest,tso,TSO_STRUCT_SIZE);
2819 stack_words = tso->stack + tso->stack_size - tso->sp;
2820 new_sp = (P_)dest + new_tso_size - stack_words;
2821 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2823 /* relocate the stack pointers... */
2825 dest->stack_size = new_stack_size;
2827 /* Mark the old TSO as relocated. We have to check for relocated
2828 * TSOs in the garbage collector and any primops that deal with TSOs.
2830 * It's important to set the sp value to just beyond the end
2831 * of the stack, so we don't attempt to scavenge any part of the
2834 tso->what_next = ThreadRelocated;
2836 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2837 tso->why_blocked = NotBlocked;
2838 dest->mut_link = NULL;
2840 IF_PAR_DEBUG(verbose,
2841 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2842 tso->id, tso, tso->stack_size);
2843 /* If we're debugging, just print out the top of the stack */
2844 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2847 IF_DEBUG(sanity,checkTSO(tso));
2849 IF_DEBUG(scheduler,printTSO(dest));
2855 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2856 //@subsection Blocking Queue Routines
2858 /* ---------------------------------------------------------------------------
2859 Wake up a queue that was blocked on some resource.
2860 ------------------------------------------------------------------------ */
2864 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2869 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2871 /* write RESUME events to log file and
2872 update blocked and fetch time (depending on type of the orig closure) */
2873 if (RtsFlags.ParFlags.ParStats.Full) {
2874 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2875 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2876 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2877 if (EMPTY_RUN_QUEUE())
2878 emitSchedule = rtsTrue;
2880 switch (get_itbl(node)->type) {
2882 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2887 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2894 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2901 static StgBlockingQueueElement *
2902 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2905 PEs node_loc, tso_loc;
2907 node_loc = where_is(node); // should be lifted out of loop
2908 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2909 tso_loc = where_is((StgClosure *)tso);
2910 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2911 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2912 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2913 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2914 // insertThread(tso, node_loc);
2915 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2917 tso, node, (rtsSpark*)NULL);
2918 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2921 } else { // TSO is remote (actually should be FMBQ)
2922 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2923 RtsFlags.GranFlags.Costs.gunblocktime +
2924 RtsFlags.GranFlags.Costs.latency;
2925 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2927 tso, node, (rtsSpark*)NULL);
2928 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2931 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2933 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2934 (node_loc==tso_loc ? "Local" : "Global"),
2935 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2936 tso->block_info.closure = NULL;
2937 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2941 static StgBlockingQueueElement *
2942 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2944 StgBlockingQueueElement *next;
2946 switch (get_itbl(bqe)->type) {
2948 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2949 /* if it's a TSO just push it onto the run_queue */
2951 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2952 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2954 unblockCount(bqe, node);
2955 /* reset blocking status after dumping event */
2956 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2960 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2962 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2963 PendingFetches = (StgBlockedFetch *)bqe;
2967 /* can ignore this case in a non-debugging setup;
2968 see comments on RBHSave closures above */
2970 /* check that the closure is an RBHSave closure */
2971 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2972 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2973 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2977 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2978 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2982 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2986 #else /* !GRAN && !PAR */
2988 unblockOneLocked(StgTSO *tso)
2992 ASSERT(get_itbl(tso)->type == TSO);
2993 ASSERT(tso->why_blocked != NotBlocked);
2994 tso->why_blocked = NotBlocked;
2996 PUSH_ON_RUN_QUEUE(tso);
2998 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
3003 #if defined(GRAN) || defined(PAR)
3004 inline StgBlockingQueueElement *
3005 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3007 ACQUIRE_LOCK(&sched_mutex);
3008 bqe = unblockOneLocked(bqe, node);
3009 RELEASE_LOCK(&sched_mutex);
3014 unblockOne(StgTSO *tso)
3016 ACQUIRE_LOCK(&sched_mutex);
3017 tso = unblockOneLocked(tso);
3018 RELEASE_LOCK(&sched_mutex);
3025 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3027 StgBlockingQueueElement *bqe;
3032 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
3033 node, CurrentProc, CurrentTime[CurrentProc],
3034 CurrentTSO->id, CurrentTSO));
3036 node_loc = where_is(node);
3038 ASSERT(q == END_BQ_QUEUE ||
3039 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
3040 get_itbl(q)->type == CONSTR); // closure (type constructor)
3041 ASSERT(is_unique(node));
3043 /* FAKE FETCH: magically copy the node to the tso's proc;
3044 no Fetch necessary because in reality the node should not have been
3045 moved to the other PE in the first place
3047 if (CurrentProc!=node_loc) {
3049 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
3050 node, node_loc, CurrentProc, CurrentTSO->id,
3051 // CurrentTSO, where_is(CurrentTSO),
3052 node->header.gran.procs));
3053 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3055 belch("## new bitmask of node %p is %#x",
3056 node, node->header.gran.procs));
3057 if (RtsFlags.GranFlags.GranSimStats.Global) {
3058 globalGranStats.tot_fake_fetches++;
3063 // ToDo: check: ASSERT(CurrentProc==node_loc);
3064 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3067 bqe points to the current element in the queue
3068 next points to the next element in the queue
3070 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
3071 //tso_loc = where_is(tso);
3073 bqe = unblockOneLocked(bqe, node);
3076 /* if this is the BQ of an RBH, we have to put back the info ripped out of
3077 the closure to make room for the anchor of the BQ */
3078 if (bqe!=END_BQ_QUEUE) {
3079 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3081 ASSERT((info_ptr==&RBH_Save_0_info) ||
3082 (info_ptr==&RBH_Save_1_info) ||
3083 (info_ptr==&RBH_Save_2_info));
3085 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3086 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3087 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3090 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
3091 node, info_type(node)));
3094 /* statistics gathering */
3095 if (RtsFlags.GranFlags.GranSimStats.Global) {
3096 // globalGranStats.tot_bq_processing_time += bq_processing_time;
3097 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
3098 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
3099 globalGranStats.tot_awbq++; // total no. of bqs awakened
3102 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
3103 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3107 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3109 StgBlockingQueueElement *bqe;
3111 ACQUIRE_LOCK(&sched_mutex);
3113 IF_PAR_DEBUG(verbose,
3114 belch("##-_ AwBQ for node %p on [%x]: ",
3118 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3119 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
3124 ASSERT(q == END_BQ_QUEUE ||
3125 get_itbl(q)->type == TSO ||
3126 get_itbl(q)->type == BLOCKED_FETCH ||
3127 get_itbl(q)->type == CONSTR);
3130 while (get_itbl(bqe)->type==TSO ||
3131 get_itbl(bqe)->type==BLOCKED_FETCH) {
3132 bqe = unblockOneLocked(bqe, node);
3134 RELEASE_LOCK(&sched_mutex);
3137 #else /* !GRAN && !PAR */
3139 #ifdef RTS_SUPPORTS_THREADS
3141 awakenBlockedQueueNoLock(StgTSO *tso)
3143 while (tso != END_TSO_QUEUE) {
3144 tso = unblockOneLocked(tso);
3150 awakenBlockedQueue(StgTSO *tso)
3152 ACQUIRE_LOCK(&sched_mutex);
3153 while (tso != END_TSO_QUEUE) {
3154 tso = unblockOneLocked(tso);
3156 RELEASE_LOCK(&sched_mutex);
3160 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
3161 //@subsection Exception Handling Routines
3163 /* ---------------------------------------------------------------------------
3165 - usually called inside a signal handler so it mustn't do anything fancy.
3166 ------------------------------------------------------------------------ */
3169 interruptStgRts(void)
3175 /* -----------------------------------------------------------------------------
3178 This is for use when we raise an exception in another thread, which
3180 This has nothing to do with the UnblockThread event in GranSim. -- HWL
3181 -------------------------------------------------------------------------- */
3183 #if defined(GRAN) || defined(PAR)
3185 NB: only the type of the blocking queue is different in GranSim and GUM
3186 the operations on the queue-elements are the same
3187 long live polymorphism!
3189 Locks: sched_mutex is held upon entry and exit.
3193 unblockThread(StgTSO *tso)
3195 StgBlockingQueueElement *t, **last;
3197 switch (tso->why_blocked) {
3200 return; /* not blocked */
3203 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3205 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3206 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3208 last = (StgBlockingQueueElement **)&mvar->head;
3209 for (t = (StgBlockingQueueElement *)mvar->head;
3211 last = &t->link, last_tso = t, t = t->link) {
3212 if (t == (StgBlockingQueueElement *)tso) {
3213 *last = (StgBlockingQueueElement *)tso->link;
3214 if (mvar->tail == tso) {
3215 mvar->tail = (StgTSO *)last_tso;
3220 barf("unblockThread (MVAR): TSO not found");
3223 case BlockedOnBlackHole:
3224 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3226 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3228 last = &bq->blocking_queue;
3229 for (t = bq->blocking_queue;
3231 last = &t->link, t = t->link) {
3232 if (t == (StgBlockingQueueElement *)tso) {
3233 *last = (StgBlockingQueueElement *)tso->link;
3237 barf("unblockThread (BLACKHOLE): TSO not found");
3240 case BlockedOnException:
3242 StgTSO *target = tso->block_info.tso;
3244 ASSERT(get_itbl(target)->type == TSO);
3246 if (target->what_next == ThreadRelocated) {
3247 target = target->link;
3248 ASSERT(get_itbl(target)->type == TSO);
3251 ASSERT(target->blocked_exceptions != NULL);
3253 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3254 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
3256 last = &t->link, t = t->link) {
3257 ASSERT(get_itbl(t)->type == TSO);
3258 if (t == (StgBlockingQueueElement *)tso) {
3259 *last = (StgBlockingQueueElement *)tso->link;
3263 barf("unblockThread (Exception): TSO not found");
3267 case BlockedOnWrite:
3268 #if defined(mingw32_TARGET_OS)
3269 case BlockedOnDoProc:
3272 /* take TSO off blocked_queue */
3273 StgBlockingQueueElement *prev = NULL;
3274 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
3275 prev = t, t = t->link) {
3276 if (t == (StgBlockingQueueElement *)tso) {
3278 blocked_queue_hd = (StgTSO *)t->link;
3279 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3280 blocked_queue_tl = END_TSO_QUEUE;
3283 prev->link = t->link;
3284 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3285 blocked_queue_tl = (StgTSO *)prev;
3291 barf("unblockThread (I/O): TSO not found");
3294 case BlockedOnDelay:
3296 /* take TSO off sleeping_queue */
3297 StgBlockingQueueElement *prev = NULL;
3298 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
3299 prev = t, t = t->link) {
3300 if (t == (StgBlockingQueueElement *)tso) {
3302 sleeping_queue = (StgTSO *)t->link;
3304 prev->link = t->link;
3309 barf("unblockThread (delay): TSO not found");
3313 barf("unblockThread");
3317 tso->link = END_TSO_QUEUE;
3318 tso->why_blocked = NotBlocked;
3319 tso->block_info.closure = NULL;
3320 PUSH_ON_RUN_QUEUE(tso);
3324 unblockThread(StgTSO *tso)
3328 /* To avoid locking unnecessarily. */
3329 if (tso->why_blocked == NotBlocked) {
3333 switch (tso->why_blocked) {
3336 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3338 StgTSO *last_tso = END_TSO_QUEUE;
3339 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3342 for (t = mvar->head; t != END_TSO_QUEUE;
3343 last = &t->link, last_tso = t, t = t->link) {
3346 if (mvar->tail == tso) {
3347 mvar->tail = last_tso;
3352 barf("unblockThread (MVAR): TSO not found");
3355 case BlockedOnBlackHole:
3356 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3358 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3360 last = &bq->blocking_queue;
3361 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
3362 last = &t->link, t = t->link) {
3368 barf("unblockThread (BLACKHOLE): TSO not found");
3371 case BlockedOnException:
3373 StgTSO *target = tso->block_info.tso;
3375 ASSERT(get_itbl(target)->type == TSO);
3377 while (target->what_next == ThreadRelocated) {
3378 target = target->link;
3379 ASSERT(get_itbl(target)->type == TSO);
3382 ASSERT(target->blocked_exceptions != NULL);
3384 last = &target->blocked_exceptions;
3385 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
3386 last = &t->link, t = t->link) {
3387 ASSERT(get_itbl(t)->type == TSO);
3393 barf("unblockThread (Exception): TSO not found");
3397 case BlockedOnWrite:
3398 #if defined(mingw32_TARGET_OS)
3399 case BlockedOnDoProc:
3402 StgTSO *prev = NULL;
3403 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
3404 prev = t, t = t->link) {
3407 blocked_queue_hd = t->link;
3408 if (blocked_queue_tl == t) {
3409 blocked_queue_tl = END_TSO_QUEUE;
3412 prev->link = t->link;
3413 if (blocked_queue_tl == t) {
3414 blocked_queue_tl = prev;
3420 barf("unblockThread (I/O): TSO not found");
3423 case BlockedOnDelay:
3425 StgTSO *prev = NULL;
3426 for (t = sleeping_queue; t != END_TSO_QUEUE;
3427 prev = t, t = t->link) {
3430 sleeping_queue = t->link;
3432 prev->link = t->link;
3437 barf("unblockThread (delay): TSO not found");
3441 barf("unblockThread");
3445 tso->link = END_TSO_QUEUE;
3446 tso->why_blocked = NotBlocked;
3447 tso->block_info.closure = NULL;
3448 PUSH_ON_RUN_QUEUE(tso);
3452 /* -----------------------------------------------------------------------------
3455 * The following function implements the magic for raising an
3456 * asynchronous exception in an existing thread.
3458 * We first remove the thread from any queue on which it might be
3459 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3461 * We strip the stack down to the innermost CATCH_FRAME, building
3462 * thunks in the heap for all the active computations, so they can
3463 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3464 * an application of the handler to the exception, and push it on
3465 * the top of the stack.
3467 * How exactly do we save all the active computations? We create an
3468 * AP_STACK for every UpdateFrame on the stack. Entering one of these
3469 * AP_STACKs pushes everything from the corresponding update frame
3470 * upwards onto the stack. (Actually, it pushes everything up to the
3471 * next update frame plus a pointer to the next AP_STACK object.
3472 * Entering the next AP_STACK object pushes more onto the stack until we
3473 * reach the last AP_STACK object - at which point the stack should look
3474 * exactly as it did when we killed the TSO and we can continue
3475 * execution by entering the closure on top of the stack.
3477 * We can also kill a thread entirely - this happens if either (a) the
3478 * exception passed to raiseAsync is NULL, or (b) there's no
3479 * CATCH_FRAME on the stack. In either case, we strip the entire
3480 * stack and replace the thread with a zombie.
3482 * Locks: sched_mutex held upon entry nor exit.
3484 * -------------------------------------------------------------------------- */
3487 deleteThread(StgTSO *tso)
3489 raiseAsync(tso,NULL);
3494 deleteThreadImmediately(StgTSO *tso)
3495 { // for forkProcess only:
3496 // delete thread without giving it a chance to catch the KillThread exception
3498 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3502 tso->what_next = ThreadKilled;
3507 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3509 /* When raising async exs from contexts where sched_mutex isn't held;
3510 use raiseAsyncWithLock(). */
3511 ACQUIRE_LOCK(&sched_mutex);
3512 raiseAsync(tso,exception);
3513 RELEASE_LOCK(&sched_mutex);
3517 raiseAsync(StgTSO *tso, StgClosure *exception)
3519 StgRetInfoTable *info;
3522 // Thread already dead?
3523 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3528 sched_belch("raising exception in thread %ld.", tso->id));
3530 // Remove it from any blocking queues
3535 // The stack freezing code assumes there's a closure pointer on
3536 // the top of the stack, so we have to arrange that this is the case...
3538 if (sp[0] == (W_)&stg_enter_info) {
3542 sp[0] = (W_)&stg_dummy_ret_closure;
3548 // 1. Let the top of the stack be the "current closure"
3550 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3553 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3554 // current closure applied to the chunk of stack up to (but not
3555 // including) the update frame. This closure becomes the "current
3556 // closure". Go back to step 2.
3558 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3559 // top of the stack applied to the exception.
3561 // 5. If it's a STOP_FRAME, then kill the thread.
3566 info = get_ret_itbl((StgClosure *)frame);
3568 while (info->i.type != UPDATE_FRAME
3569 && (info->i.type != CATCH_FRAME || exception == NULL)
3570 && info->i.type != STOP_FRAME) {
3571 frame += stack_frame_sizeW((StgClosure *)frame);
3572 info = get_ret_itbl((StgClosure *)frame);
3575 switch (info->i.type) {
3578 // If we find a CATCH_FRAME, and we've got an exception to raise,
3579 // then build the THUNK raise(exception), and leave it on
3580 // top of the CATCH_FRAME ready to enter.
3584 StgCatchFrame *cf = (StgCatchFrame *)frame;
3588 // we've got an exception to raise, so let's pass it to the
3589 // handler in this frame.
3591 raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3592 TICK_ALLOC_SE_THK(1,0);
3593 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3594 raise->payload[0] = exception;
3596 // throw away the stack from Sp up to the CATCH_FRAME.
3600 /* Ensure that async excpetions are blocked now, so we don't get
3601 * a surprise exception before we get around to executing the
3604 if (tso->blocked_exceptions == NULL) {
3605 tso->blocked_exceptions = END_TSO_QUEUE;
3608 /* Put the newly-built THUNK on top of the stack, ready to execute
3609 * when the thread restarts.
3612 sp[-1] = (W_)&stg_enter_info;
3614 tso->what_next = ThreadRunGHC;
3615 IF_DEBUG(sanity, checkTSO(tso));
3624 // First build an AP_STACK consisting of the stack chunk above the
3625 // current update frame, with the top word on the stack as the
3628 words = frame - sp - 1;
3629 ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3632 ap->fun = (StgClosure *)sp[0];
3634 for(i=0; i < (nat)words; ++i) {
3635 ap->payload[i] = (StgClosure *)*sp++;
3638 SET_HDR(ap,&stg_AP_STACK_info,
3639 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3640 TICK_ALLOC_UP_THK(words+1,0);
3643 fprintf(stderr, "scheduler: Updating ");
3644 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3645 fprintf(stderr, " with ");
3646 printObj((StgClosure *)ap);
3649 // Replace the updatee with an indirection - happily
3650 // this will also wake up any threads currently
3651 // waiting on the result.
3653 // Warning: if we're in a loop, more than one update frame on
3654 // the stack may point to the same object. Be careful not to
3655 // overwrite an IND_OLDGEN in this case, because we'll screw
3656 // up the mutable lists. To be on the safe side, don't
3657 // overwrite any kind of indirection at all. See also
3658 // threadSqueezeStack in GC.c, where we have to make a similar
3661 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3662 // revert the black hole
3663 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3665 sp += sizeofW(StgUpdateFrame) - 1;
3666 sp[0] = (W_)ap; // push onto stack
3671 // We've stripped the entire stack, the thread is now dead.
3672 sp += sizeofW(StgStopFrame);
3673 tso->what_next = ThreadKilled;
3684 /* -----------------------------------------------------------------------------
3685 resurrectThreads is called after garbage collection on the list of
3686 threads found to be garbage. Each of these threads will be woken
3687 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3688 on an MVar, or NonTermination if the thread was blocked on a Black
3691 Locks: sched_mutex isn't held upon entry nor exit.
3692 -------------------------------------------------------------------------- */
3695 resurrectThreads( StgTSO *threads )
3699 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3700 next = tso->global_link;
3701 tso->global_link = all_threads;
3703 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3705 switch (tso->why_blocked) {
3707 case BlockedOnException:
3708 /* Called by GC - sched_mutex lock is currently held. */
3709 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3711 case BlockedOnBlackHole:
3712 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3715 /* This might happen if the thread was blocked on a black hole
3716 * belonging to a thread that we've just woken up (raiseAsync
3717 * can wake up threads, remember...).
3721 barf("resurrectThreads: thread blocked in a strange way");
3726 /* -----------------------------------------------------------------------------
3727 * Blackhole detection: if we reach a deadlock, test whether any
3728 * threads are blocked on themselves. Any threads which are found to
3729 * be self-blocked get sent a NonTermination exception.
3731 * This is only done in a deadlock situation in order to avoid
3732 * performance overhead in the normal case.
3734 * Locks: sched_mutex is held upon entry and exit.
3735 * -------------------------------------------------------------------------- */
3738 detectBlackHoles( void )
3740 StgTSO *tso = all_threads;
3742 StgClosure *blocked_on;
3743 StgRetInfoTable *info;
3745 for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3747 while (tso->what_next == ThreadRelocated) {
3749 ASSERT(get_itbl(tso)->type == TSO);
3752 if (tso->why_blocked != BlockedOnBlackHole) {
3755 blocked_on = tso->block_info.closure;
3757 frame = (StgClosure *)tso->sp;
3760 info = get_ret_itbl(frame);
3761 switch (info->i.type) {
3763 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3764 /* We are blocking on one of our own computations, so
3765 * send this thread the NonTermination exception.
3768 sched_belch("thread %d is blocked on itself", tso->id));
3769 raiseAsync(tso, (StgClosure *)NonTermination_closure);
3773 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3779 // normal stack frames; do nothing except advance the pointer
3781 (StgPtr)frame += stack_frame_sizeW(frame);
3788 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3789 //@subsection Debugging Routines
3791 /* -----------------------------------------------------------------------------
3792 * Debugging: why is a thread blocked
3793 * [Also provides useful information when debugging threaded programs
3794 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3795 -------------------------------------------------------------------------- */
3799 printThreadBlockage(StgTSO *tso)
3801 switch (tso->why_blocked) {
3803 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3805 case BlockedOnWrite:
3806 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3808 #if defined(mingw32_TARGET_OS)
3809 case BlockedOnDoProc:
3810 fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3813 case BlockedOnDelay:
3814 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3817 fprintf(stderr,"is blocked on an MVar");
3819 case BlockedOnException:
3820 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3821 tso->block_info.tso->id);
3823 case BlockedOnBlackHole:
3824 fprintf(stderr,"is blocked on a black hole");
3827 fprintf(stderr,"is not blocked");
3831 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3832 tso->block_info.closure, info_type(tso->block_info.closure));
3834 case BlockedOnGA_NoSend:
3835 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3836 tso->block_info.closure, info_type(tso->block_info.closure));
3839 #if defined(RTS_SUPPORTS_THREADS)
3840 case BlockedOnCCall:
3841 fprintf(stderr,"is blocked on an external call");
3843 case BlockedOnCCall_NoUnblockExc:
3844 fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3848 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3849 tso->why_blocked, tso->id, tso);
3855 printThreadStatus(StgTSO *tso)
3857 switch (tso->what_next) {
3859 fprintf(stderr,"has been killed");
3861 case ThreadComplete:
3862 fprintf(stderr,"has completed");
3865 printThreadBlockage(tso);
3870 printAllThreads(void)
3876 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3877 ullong_format_string(TIME_ON_PROC(CurrentProc),
3878 time_string, rtsFalse/*no commas!*/);
3880 fprintf(stderr, "all threads at [%s]:\n", time_string);
3882 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3883 ullong_format_string(CURRENT_TIME,
3884 time_string, rtsFalse/*no commas!*/);
3886 fprintf(stderr,"all threads at [%s]:\n", time_string);
3888 fprintf(stderr,"all threads:\n");
3891 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3892 fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3893 label = lookupThreadLabel((StgWord)t);
3894 if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3895 printThreadStatus(t);
3896 fprintf(stderr,"\n");
3903 Print a whole blocking queue attached to node (debugging only).
3908 print_bq (StgClosure *node)
3910 StgBlockingQueueElement *bqe;
3914 fprintf(stderr,"## BQ of closure %p (%s): ",
3915 node, info_type(node));
3917 /* should cover all closures that may have a blocking queue */
3918 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3919 get_itbl(node)->type == FETCH_ME_BQ ||
3920 get_itbl(node)->type == RBH ||
3921 get_itbl(node)->type == MVAR);
3923 ASSERT(node!=(StgClosure*)NULL); // sanity check
3925 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3929 Print a whole blocking queue starting with the element bqe.
3932 print_bqe (StgBlockingQueueElement *bqe)
3937 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3939 for (end = (bqe==END_BQ_QUEUE);
3940 !end; // iterate until bqe points to a CONSTR
3941 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3942 bqe = end ? END_BQ_QUEUE : bqe->link) {
3943 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3944 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3945 /* types of closures that may appear in a blocking queue */
3946 ASSERT(get_itbl(bqe)->type == TSO ||
3947 get_itbl(bqe)->type == BLOCKED_FETCH ||
3948 get_itbl(bqe)->type == CONSTR);
3949 /* only BQs of an RBH end with an RBH_Save closure */
3950 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3952 switch (get_itbl(bqe)->type) {
3954 fprintf(stderr," TSO %u (%x),",
3955 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3958 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3959 ((StgBlockedFetch *)bqe)->node,
3960 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3961 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3962 ((StgBlockedFetch *)bqe)->ga.weight);
3965 fprintf(stderr," %s (IP %p),",
3966 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3967 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3968 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3969 "RBH_Save_?"), get_itbl(bqe));
3972 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3973 info_type((StgClosure *)bqe)); // , node, info_type(node));
3977 fputc('\n', stderr);
3979 # elif defined(GRAN)
3981 print_bq (StgClosure *node)
3983 StgBlockingQueueElement *bqe;
3984 PEs node_loc, tso_loc;
3987 /* should cover all closures that may have a blocking queue */
3988 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3989 get_itbl(node)->type == FETCH_ME_BQ ||
3990 get_itbl(node)->type == RBH);
3992 ASSERT(node!=(StgClosure*)NULL); // sanity check
3993 node_loc = where_is(node);
3995 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3996 node, info_type(node), node_loc);
3999 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4001 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4002 !end; // iterate until bqe points to a CONSTR
4003 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4004 ASSERT(bqe != END_BQ_QUEUE); // sanity check
4005 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
4006 /* types of closures that may appear in a blocking queue */
4007 ASSERT(get_itbl(bqe)->type == TSO ||
4008 get_itbl(bqe)->type == CONSTR);
4009 /* only BQs of an RBH end with an RBH_Save closure */
4010 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4012 tso_loc = where_is((StgClosure *)bqe);
4013 switch (get_itbl(bqe)->type) {
4015 fprintf(stderr," TSO %d (%p) on [PE %d],",
4016 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4019 fprintf(stderr," %s (IP %p),",
4020 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4021 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4022 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4023 "RBH_Save_?"), get_itbl(bqe));
4026 barf("Unexpected closure type %s in blocking queue of %p (%s)",
4027 info_type((StgClosure *)bqe), node, info_type(node));
4031 fputc('\n', stderr);
4035 Nice and easy: only TSOs on the blocking queue
4038 print_bq (StgClosure *node)
4042 ASSERT(node!=(StgClosure*)NULL); // sanity check
4043 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
4044 tso != END_TSO_QUEUE;
4046 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
4047 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
4048 fprintf(stderr," TSO %d (%p),", tso->id, tso);
4050 fputc('\n', stderr);
4061 for (i=0, tso=run_queue_hd;
4062 tso != END_TSO_QUEUE;
4071 sched_belch(char *s, ...)
4076 fprintf(stderr, "scheduler (task %ld): ", osThreadId());
4078 fprintf(stderr, "== ");
4080 fprintf(stderr, "scheduler: ");
4082 vfprintf(stderr, s, ap);
4083 fprintf(stderr, "\n");
4090 //@node Index, , Debugging Routines, Main scheduling code
4094 //* StgMainThread:: @cindex\s-+StgMainThread
4095 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
4096 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
4097 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
4098 //* context_switch:: @cindex\s-+context_switch
4099 //* createThread:: @cindex\s-+createThread
4100 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
4101 //* initScheduler:: @cindex\s-+initScheduler
4102 //* interrupted:: @cindex\s-+interrupted
4103 //* next_thread_id:: @cindex\s-+next_thread_id
4104 //* print_bq:: @cindex\s-+print_bq
4105 //* run_queue_hd:: @cindex\s-+run_queue_hd
4106 //* run_queue_tl:: @cindex\s-+run_queue_tl
4107 //* sched_mutex:: @cindex\s-+sched_mutex
4108 //* schedule:: @cindex\s-+schedule
4109 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
4110 //* term_mutex:: @cindex\s-+term_mutex