1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.121 2002/02/12 15:38:08 sof Exp $
4 * (c) The GHC Team, 1998-2000
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distributed memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
18 * --------------------------------------------------------------------------*/
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
24 * Version with scheduler monitor support for SMPs (WAY=s):
26 This design provides a high-level API to create and schedule threads etc.
27 as documented in the SMP design document.
29 It uses a monitor design controlled by a single mutex to exercise control
30 over accesses to shared data structures, and builds on the Posix threads
33 The majority of state is shared. In order to keep essential per-task state,
34 there is a Capability structure, which contains all the information
35 needed to run a thread: its STG registers, a pointer to its TSO, a
36 nursery etc. During STG execution, a pointer to the capability is
37 kept in a register (BaseReg).
39 In a non-SMP build, there is one global capability, namely MainRegTable.
43 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
45 The main scheduling loop in GUM iterates until a finish message is received.
46 In that case a global flag @receivedFinish@ is set and this instance of
47 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48 for the handling of incoming messages, such as PP_FINISH.
49 Note that in the parallel case we have a system manager that coordinates
50 different PEs, each of which are running one instance of the RTS.
51 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
54 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
56 The main scheduling code in GranSim is quite different from that in std
57 (concurrent) Haskell: while concurrent Haskell just iterates over the
58 threads in the runnable queue, GranSim is event driven, i.e. it iterates
59 over the events in the global event queue. -- HWL
64 //* Variables and Data structures::
65 //* Main scheduling loop::
66 //* Suspend and Resume::
68 //* Garbage Collextion Routines::
69 //* Blocking Queue Routines::
70 //* Exception Handling Routines::
71 //* Debugging Routines::
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
78 #include "PosixSource.h"
85 #include "StgStartup.h"
88 #include "StgMiscClosures.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
124 * These are the threads which clients have requested that we run.
126 * In a 'threaded' build, we might have several concurrent clients all
127 * waiting for results, and each one will wait on a condition variable
128 * until the result is available.
130 * In non-SMP, clients are strictly nested: the first client calls
131 * into the RTS, which might call out again to C with a _ccall_GC, and
132 * eventually re-enter the RTS.
134 * Main threads information is kept in a linked list:
136 //@cindex StgMainThread
137 typedef struct StgMainThread_ {
139 SchedulerStatus stat;
141 #if defined(RTS_SUPPORTS_THREADS)
144 struct StgMainThread_ *link;
147 /* Main thread queue.
148 * Locks required: sched_mutex.
150 static StgMainThread *main_threads;
153 * Locks required: sched_mutex.
157 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
161 In GranSim we have a runable and a blocked queue for each processor.
162 In order to minimise code changes new arrays run_queue_hds/tls
163 are created. run_queue_hd is then a short cut (macro) for
164 run_queue_hds[CurrentProc] (see GranSim.h).
167 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
168 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
169 StgTSO *ccalling_threadss[MAX_PROC];
170 /* We use the same global list of threads (all_threads) in GranSim as in
171 the std RTS (i.e. we are cheating). However, we don't use this list in
172 the GranSim specific code at the moment (so we are only potentially
177 StgTSO *run_queue_hd, *run_queue_tl;
178 StgTSO *blocked_queue_hd, *blocked_queue_tl;
179 StgTSO *sleeping_queue; /* perhaps replace with a hash table? */
183 /* Linked list of all threads.
184 * Used for detecting garbage collected threads.
188 /* When a thread performs a safe C call (_ccall_GC, using old
189 * terminology), it gets put on the suspended_ccalling_threads
190 * list. Used by the garbage collector.
192 static StgTSO *suspended_ccalling_threads;
194 static StgTSO *threadStackOverflow(StgTSO *tso);
196 /* KH: The following two flags are shared memory locations. There is no need
197 to lock them, since they are only unset at the end of a scheduler
201 /* flag set by signal handler to precipitate a context switch */
202 //@cindex context_switch
205 /* if this flag is set as well, give up execution */
206 //@cindex interrupted
209 /* Next thread ID to allocate.
210 * Locks required: sched_mutex
212 //@cindex next_thread_id
213 StgThreadID next_thread_id = 1;
216 * Pointers to the state of the current thread.
217 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
218 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
221 /* The smallest stack size that makes any sense is:
222 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
223 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
224 * + 1 (the realworld token for an IO thread)
225 * + 1 (the closure to enter)
227 * A thread with this stack will bomb immediately with a stack
228 * overflow, which will increase its stack size.
231 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
238 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
239 * exists - earlier gccs apparently didn't.
246 void addToBlockedQueue ( StgTSO *tso );
248 static void schedule ( void );
249 void interruptStgRts ( void );
251 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
253 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
256 static void detectBlackHoles ( void );
259 static void sched_belch(char *s, ...);
262 #if defined(RTS_SUPPORTS_THREADS)
263 /* ToDo: carefully document the invariants that go together
264 * with these synchronisation objects.
266 Mutex sched_mutex = INIT_MUTEX_VAR;
267 Mutex term_mutex = INIT_MUTEX_VAR;
268 #if defined(THREADED_RTS)
270 * The rts_mutex is the 'big lock' that the active native
271 * thread within the RTS holds while executing code.
272 * It is given up when the thread makes a transition out of
273 * the RTS (e.g., to perform an external C call), hopefully
274 * for another thread to take over its chores and enter
278 Mutex rts_mutex = INIT_MUTEX_VAR;
280 * When a native thread has completed executing an external
281 * call, it needs to communicate the result back to the
282 * (Haskell) thread that made the call. Do this as follows:
284 * - in resumeThread(), the thread increments the counter
285 * threads_waiting, and then blocks on the 'big' RTS lock.
286 * - upon entry to the scheduler, the thread that's currently
287 * holding the RTS lock checks threads_waiting. If there
288 * are native threads waiting, it gives up its RTS lock
289 * and tries to re-grab the RTS lock [perhaps after having
290 * waited for a bit..?]
291 * - care must be taken to deal with the case where more than
292 * one external thread are waiting on the lock. [ToDo: more]
296 static nat threads_waiting = 0;
300 /* thread_ready_cond: when signalled, a thread has become runnable for a
303 * In the non-SMP case, it also implies that the thread that is woken up has
304 * exclusive access to the RTS and all its DS (that are not under sched_mutex's
307 * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold.
310 Condition thread_ready_cond = INIT_COND_VAR;
312 /* For documentation purposes only */
313 #define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE())
317 Condition gc_pending_cond = INIT_COND_VAR;
325 rtsTime TimeOfLastYield;
326 rtsBool emitSchedule = rtsTrue;
330 char *whatNext_strs[] = {
338 char *threadReturnCode_strs[] = {
339 "HeapOverflow", /* might also be StackOverflow */
348 StgTSO * createSparkThread(rtsSpark spark);
349 StgTSO * activateSpark (rtsSpark spark);
353 * The thread state for the main thread.
354 // ToDo: check whether not needed any more
358 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
359 static void taskStart(void);
363 /* threads start up using 'taskStart', so make them
364 them grab the RTS lock. */
365 #if defined(THREADED_RTS)
366 ACQUIRE_LOCK(&rts_mutex);
376 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
377 //@subsection Main scheduling loop
379 /* ---------------------------------------------------------------------------
380 Main scheduling loop.
382 We use round-robin scheduling, each thread returning to the
383 scheduler loop when one of these conditions is detected:
386 * timer expires (thread yields)
391 Locking notes: we acquire the scheduler lock once at the beginning
392 of the scheduler loop, and release it when
394 * running a thread, or
395 * waiting for work, or
396 * waiting for a GC to complete.
399 In a GranSim setup this loop iterates over the global event queue.
400 This revolves around the global event queue, which determines what
401 to do next. Therefore, it's more complicated than either the
402 concurrent or the parallel (GUM) setup.
405 GUM iterates over incoming messages.
406 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
407 and sends out a fish whenever it has nothing to do; in-between
408 doing the actual reductions (shared code below) it processes the
409 incoming messages and deals with delayed operations
410 (see PendingFetches).
411 This is not the ugliest code you could imagine, but it's bloody close.
413 ------------------------------------------------------------------------ */
420 StgThreadReturnCode ret;
428 rtsBool receivedFinish = rtsFalse;
430 nat tp_size, sp_size; // stats only
433 rtsBool was_interrupted = rtsFalse;
435 ACQUIRE_LOCK(&sched_mutex);
437 #if defined(THREADED_RTS)
438 /* ToDo: consider SMP support */
439 if (threads_waiting > 0) {
440 /* (At least) one native thread is waiting to
441 * deposit the result of an external call. So,
442 * give up our RTS executing privileges and let
443 * one of them continue.
447 RELEASE_LOCK(&sched_mutex);
448 IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (threads_waiting=%d)\n", osThreadId(), threads_waiting));
449 RELEASE_LOCK(&rts_mutex);
450 /* ToDo: come up with mechanism that guarantees that
451 * the main thread doesn't loop here.
454 /* ToDo: longjmp() */
461 /* set up first event to get things going */
462 /* ToDo: assign costs for system setup and init MainTSO ! */
463 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
465 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
468 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
469 G_TSO(CurrentTSO, 5));
471 if (RtsFlags.GranFlags.Light) {
472 /* Save current time; GranSim Light only */
473 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
476 event = get_next_event();
478 while (event!=(rtsEvent*)NULL) {
479 /* Choose the processor with the next event */
480 CurrentProc = event->proc;
481 CurrentTSO = event->tso;
485 while (!receivedFinish) { /* set by processMessages */
486 /* when receiving PP_FINISH message */
493 IF_DEBUG(scheduler, printAllThreads());
495 /* If we're interrupted (the user pressed ^C, or some other
496 * termination condition occurred), kill all the currently running
500 IF_DEBUG(scheduler, sched_belch("interrupted"));
502 interrupted = rtsFalse;
503 was_interrupted = rtsTrue;
506 /* Go through the list of main threads and wake up any
507 * clients whose computations have finished. ToDo: this
508 * should be done more efficiently without a linear scan
509 * of the main threads list, somehow...
511 #if defined(RTS_SUPPORTS_THREADS)
513 StgMainThread *m, **prev;
514 prev = &main_threads;
515 for (m = main_threads; m != NULL; m = m->link) {
516 switch (m->tso->what_next) {
519 *(m->ret) = (StgClosure *)m->tso->sp[0];
523 broadcastCondition(&m->wakeup);
526 if (m->ret) *(m->ret) = NULL;
528 if (was_interrupted) {
529 m->stat = Interrupted;
533 broadcastCondition(&m->wakeup);
541 #else /* not threaded */
544 /* in GUM do this only on the Main PE */
547 /* If our main thread has finished or been killed, return.
550 StgMainThread *m = main_threads;
551 if (m->tso->what_next == ThreadComplete
552 || m->tso->what_next == ThreadKilled) {
553 main_threads = main_threads->link;
554 if (m->tso->what_next == ThreadComplete) {
555 /* we finished successfully, fill in the return value */
556 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
560 if (m->ret) { *(m->ret) = NULL; };
561 if (was_interrupted) {
562 m->stat = Interrupted;
572 /* Top up the run queue from our spark pool. We try to make the
573 * number of threads in the run queue equal to the number of
576 * Disable spark support in SMP for now, non-essential & requires
577 * a little bit of work to make it compile cleanly. -- sof 1/02.
579 #if 0 /* defined(SMP) */
581 nat n = getFreeCapabilities();
582 StgTSO *tso = run_queue_hd;
584 /* Count the run queue */
585 while (n > 0 && tso != END_TSO_QUEUE) {
592 spark = findSpark(rtsFalse);
594 break; /* no more sparks in the pool */
596 /* I'd prefer this to be done in activateSpark -- HWL */
597 /* tricky - it needs to hold the scheduler lock and
598 * not try to re-acquire it -- SDM */
599 createSparkThread(spark);
601 sched_belch("==^^ turning spark of closure %p into a thread",
602 (StgClosure *)spark));
605 /* We need to wake up the other tasks if we just created some
608 if (getFreeCapabilities() - n > 1) {
609 signalCondition( &thread_ready_cond );
614 /* check for signals each time around the scheduler */
615 #ifndef mingw32_TARGET_OS
616 if (signals_pending()) {
617 startSignalHandlers();
621 /* Check whether any waiting threads need to be woken up. If the
622 * run queue is empty, and there are no other tasks running, we
623 * can wait indefinitely for something to happen.
624 * ToDo: what if another client comes along & requests another
627 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
628 awaitEvent( EMPTY_RUN_QUEUE()
630 && allFreeCapabilities()
634 /* we can be interrupted while waiting for I/O... */
635 if (interrupted) continue;
638 * Detect deadlock: when we have no threads to run, there are no
639 * threads waiting on I/O or sleeping, and all the other tasks are
640 * waiting for work, we must have a deadlock of some description.
642 * We first try to find threads blocked on themselves (ie. black
643 * holes), and generate NonTermination exceptions where necessary.
645 * If no threads are black holed, we have a deadlock situation, so
646 * inform all the main threads.
649 if ( EMPTY_QUEUE(blocked_queue_hd)
651 && EMPTY_QUEUE(sleeping_queue)
653 && allFreeCapabilities()
654 #elif defined(THREADED_RTS)
655 && EMPTY_QUEUE(suspended_ccalling_threads)
659 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
660 RELEASE_LOCK(&sched_mutex);
661 GarbageCollect(GetRoots,rtsTrue);
662 ACQUIRE_LOCK(&sched_mutex);
663 IF_DEBUG(scheduler, sched_belch("GC done."));
664 if ( EMPTY_QUEUE(blocked_queue_hd)
666 && EMPTY_QUEUE(sleeping_queue) ) {
668 IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
671 /* No black holes, so probably a real deadlock. Send the
672 * current main thread the Deadlock exception (or in the SMP
673 * build, send *all* main threads the deadlock exception,
674 * since none of them can make progress).
676 if ( EMPTY_RUN_QUEUE() ) {
678 #if defined(RTS_SUPPORTS_THREADS)
679 for (m = main_threads; m != NULL; m = m->link) {
680 switch (m->tso->why_blocked) {
681 case BlockedOnBlackHole:
682 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
684 case BlockedOnException:
686 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
689 barf("deadlock: main thread blocked in a strange way");
694 switch (m->tso->why_blocked) {
695 case BlockedOnBlackHole:
696 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
698 case BlockedOnException:
700 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
703 barf("deadlock: main thread blocked in a strange way");
707 #if defined(RTS_SUPPORTS_THREADS)
708 if ( EMPTY_RUN_QUEUE() ) {
709 IF_DEBUG(scheduler, sched_belch("all done, it seems...shut down."));
710 shutdownHaskellAndExit(0);
714 ASSERT( !EMPTY_RUN_QUEUE() );
718 /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
722 /* If there's a GC pending, don't do anything until it has
726 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
727 waitCondition( &gc_pending_cond, &sched_mutex );
732 /* block until we've got a thread on the run queue and a free
735 while ( noCapabilities() || EMPTY_RUN_QUEUE() ) {
736 IF_DEBUG(scheduler, sched_belch("waiting for work"));
737 waitCondition( &thread_ready_cond, &sched_mutex );
738 IF_DEBUG(scheduler, sched_belch("work now available"));
740 #elif defined(THREADED_RTS)
741 if ( EMPTY_RUN_QUEUErun_queue_hd == END_TSO_QUEUE ) {
742 /* no work available, wait for external calls to complete. */
743 IF_DEBUG(scheduler, sched_belch("worker thread (%d): waiting for external thread to complete..", osThreadId()));
745 RELEASE_LOCK(&rts_mutex);
747 while ( EMPTY_RUN_QUEUE() ) {
748 waitCondition(&thread_ready_cond, &sched_mutex);
750 RELEASE_LOCK(&sched_mutex);
752 IF_DEBUG(scheduler, sched_belch("worker thread (%d): re-awakened from no-work slumber..\n", osThreadId()));
753 /* ToDo: longjmp() */
760 if (RtsFlags.GranFlags.Light)
761 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
763 /* adjust time based on time-stamp */
764 if (event->time > CurrentTime[CurrentProc] &&
765 event->evttype != ContinueThread)
766 CurrentTime[CurrentProc] = event->time;
768 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
769 if (!RtsFlags.GranFlags.Light)
772 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
774 /* main event dispatcher in GranSim */
775 switch (event->evttype) {
776 /* Should just be continuing execution */
778 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
779 /* ToDo: check assertion
780 ASSERT(run_queue_hd != (StgTSO*)NULL &&
781 run_queue_hd != END_TSO_QUEUE);
783 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
784 if (!RtsFlags.GranFlags.DoAsyncFetch &&
785 procStatus[CurrentProc]==Fetching) {
786 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
787 CurrentTSO->id, CurrentTSO, CurrentProc);
790 /* Ignore ContinueThreads for completed threads */
791 if (CurrentTSO->what_next == ThreadComplete) {
792 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
793 CurrentTSO->id, CurrentTSO, CurrentProc);
796 /* Ignore ContinueThreads for threads that are being migrated */
797 if (PROCS(CurrentTSO)==Nowhere) {
798 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
799 CurrentTSO->id, CurrentTSO, CurrentProc);
802 /* The thread should be at the beginning of the run queue */
803 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
804 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
805 CurrentTSO->id, CurrentTSO, CurrentProc);
806 break; // run the thread anyway
809 new_event(proc, proc, CurrentTime[proc],
811 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
813 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
814 break; // now actually run the thread; DaH Qu'vam yImuHbej
817 do_the_fetchnode(event);
818 goto next_thread; /* handle next event in event queue */
821 do_the_globalblock(event);
822 goto next_thread; /* handle next event in event queue */
825 do_the_fetchreply(event);
826 goto next_thread; /* handle next event in event queue */
828 case UnblockThread: /* Move from the blocked queue to the tail of */
829 do_the_unblock(event);
830 goto next_thread; /* handle next event in event queue */
832 case ResumeThread: /* Move from the blocked queue to the tail of */
833 /* the runnable queue ( i.e. Qu' SImqa'lu') */
834 event->tso->gran.blocktime +=
835 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
836 do_the_startthread(event);
837 goto next_thread; /* handle next event in event queue */
840 do_the_startthread(event);
841 goto next_thread; /* handle next event in event queue */
844 do_the_movethread(event);
845 goto next_thread; /* handle next event in event queue */
848 do_the_movespark(event);
849 goto next_thread; /* handle next event in event queue */
852 do_the_findwork(event);
853 goto next_thread; /* handle next event in event queue */
856 barf("Illegal event type %u\n", event->evttype);
859 /* This point was scheduler_loop in the old RTS */
861 IF_DEBUG(gran, belch("GRAN: after main switch"));
863 TimeOfLastEvent = CurrentTime[CurrentProc];
864 TimeOfNextEvent = get_time_of_next_event();
865 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
866 // CurrentTSO = ThreadQueueHd;
868 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
871 if (RtsFlags.GranFlags.Light)
872 GranSimLight_leave_system(event, &ActiveTSO);
874 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
877 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
879 /* in a GranSim setup the TSO stays on the run queue */
881 /* Take a thread from the run queue. */
882 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
885 fprintf(stderr, "GRAN: About to run current thread, which is\n");
888 context_switch = 0; // turned on via GranYield, checking events and time slice
891 DumpGranEvent(GR_SCHEDULE, t));
893 procStatus[CurrentProc] = Busy;
896 if (PendingFetches != END_BF_QUEUE) {
900 /* ToDo: phps merge with spark activation above */
901 /* check whether we have local work and send requests if we have none */
902 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
903 /* :-[ no local threads => look out for local sparks */
904 /* the spark pool for the current PE */
905 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
906 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
907 pool->hd < pool->tl) {
909 * ToDo: add GC code check that we really have enough heap afterwards!!
911 * If we're here (no runnable threads) and we have pending
912 * sparks, we must have a space problem. Get enough space
913 * to turn one of those pending sparks into a
917 spark = findSpark(rtsFalse); /* get a spark */
918 if (spark != (rtsSpark) NULL) {
919 tso = activateSpark(spark); /* turn the spark into a thread */
920 IF_PAR_DEBUG(schedule,
921 belch("==== schedule: Created TSO %d (%p); %d threads active",
922 tso->id, tso, advisory_thread_count));
924 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
925 belch("==^^ failed to activate spark");
927 } /* otherwise fall through & pick-up new tso */
929 IF_PAR_DEBUG(verbose,
930 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
931 spark_queue_len(pool)));
936 /* If we still have no work we need to send a FISH to get a spark
939 if (EMPTY_RUN_QUEUE()) {
940 /* =8-[ no local sparks => look for work on other PEs */
942 * We really have absolutely no work. Send out a fish
943 * (there may be some out there already), and wait for
944 * something to arrive. We clearly can't run any threads
945 * until a SCHEDULE or RESUME arrives, and so that's what
946 * we're hoping to see. (Of course, we still have to
947 * respond to other types of messages.)
949 TIME now = msTime() /*CURRENT_TIME*/;
950 IF_PAR_DEBUG(verbose,
951 belch("-- now=%ld", now));
952 IF_PAR_DEBUG(verbose,
953 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
954 (last_fish_arrived_at!=0 &&
955 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
956 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
957 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
958 last_fish_arrived_at,
959 RtsFlags.ParFlags.fishDelay, now);
962 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
963 (last_fish_arrived_at==0 ||
964 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
965 /* outstandingFishes is set in sendFish, processFish;
966 avoid flooding system with fishes via delay */
968 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
971 // Global statistics: count no. of fishes
972 if (RtsFlags.ParFlags.ParStats.Global &&
973 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
974 globalParStats.tot_fish_mess++;
978 receivedFinish = processMessages();
981 } else if (PacketsWaiting()) { /* Look for incoming messages */
982 receivedFinish = processMessages();
985 /* Now we are sure that we have some work available */
986 ASSERT(run_queue_hd != END_TSO_QUEUE);
988 /* Take a thread from the run queue, if we have work */
989 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
990 IF_DEBUG(sanity,checkTSO(t));
992 /* ToDo: write something to the log-file
993 if (RTSflags.ParFlags.granSimStats && !sameThread)
994 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
998 /* the spark pool for the current PE */
999 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1002 belch("--=^ %d threads, %d sparks on [%#x]",
1003 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1006 if (0 && RtsFlags.ParFlags.ParStats.Full &&
1007 t && LastTSO && t->id != LastTSO->id &&
1008 LastTSO->why_blocked == NotBlocked &&
1009 LastTSO->what_next != ThreadComplete) {
1010 // if previously scheduled TSO not blocked we have to record the context switch
1011 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1012 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1015 if (RtsFlags.ParFlags.ParStats.Full &&
1016 (emitSchedule /* forced emit */ ||
1017 (t && LastTSO && t->id != LastTSO->id))) {
1019 we are running a different TSO, so write a schedule event to log file
1020 NB: If we use fair scheduling we also have to write a deschedule
1021 event for LastTSO; with unfair scheduling we know that the
1022 previous tso has blocked whenever we switch to another tso, so
1023 we don't need it in GUM for now
1025 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1026 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1027 emitSchedule = rtsFalse;
1031 #else /* !GRAN && !PAR */
1033 /* grab a thread from the run queue
1035 ASSERT(run_queue_hd != END_TSO_QUEUE);
1036 t = POP_RUN_QUEUE();
1037 // Sanity check the thread we're about to run. This can be
1038 // expensive if there is lots of thread switching going on...
1039 IF_DEBUG(sanity,checkTSO(t));
1042 grabCapability(&cap);
1043 cap->r.rCurrentTSO = t;
1045 /* context switches are now initiated by the timer signal, unless
1046 * the user specified "context switch as often as possible", with
1051 RtsFlags.ProfFlags.profileInterval == 0 ||
1053 (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1054 && (run_queue_hd != END_TSO_QUEUE
1055 || blocked_queue_hd != END_TSO_QUEUE
1056 || sleeping_queue != END_TSO_QUEUE)))
1061 RELEASE_LOCK(&sched_mutex);
1063 IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
1064 t->id, t, whatNext_strs[t->what_next]));
1067 startHeapProfTimer();
1070 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1071 /* Run the current thread
1073 switch (cap->r.rCurrentTSO->what_next) {
1075 case ThreadComplete:
1076 /* Thread already finished, return to scheduler. */
1077 ret = ThreadFinished;
1079 case ThreadEnterGHC:
1080 ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1083 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1085 case ThreadEnterInterp:
1086 ret = interpretBCO(cap);
1089 barf("schedule: invalid what_next field");
1091 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1093 /* Costs for the scheduler are assigned to CCS_SYSTEM */
1095 stopHeapProfTimer();
1099 ACQUIRE_LOCK(&sched_mutex);
1102 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1103 #elif !defined(GRAN) && !defined(PAR)
1104 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1106 t = cap->r.rCurrentTSO;
1109 /* HACK 675: if the last thread didn't yield, make sure to print a
1110 SCHEDULE event to the log file when StgRunning the next thread, even
1111 if it is the same one as before */
1113 TimeOfLastYield = CURRENT_TIME;
1119 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1120 globalGranStats.tot_heapover++;
1122 globalParStats.tot_heapover++;
1125 // did the task ask for a large block?
1126 if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1127 // if so, get one and push it on the front of the nursery.
1131 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1133 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)",
1135 whatNext_strs[t->what_next], blocks));
1137 // don't do this if it would push us over the
1138 // alloc_blocks_lim limit; we'll GC first.
1139 if (alloc_blocks + blocks < alloc_blocks_lim) {
1141 alloc_blocks += blocks;
1142 bd = allocGroup( blocks );
1144 // link the new group into the list
1145 bd->link = cap->r.rCurrentNursery;
1146 bd->u.back = cap->r.rCurrentNursery->u.back;
1147 if (cap->r.rCurrentNursery->u.back != NULL) {
1148 cap->r.rCurrentNursery->u.back->link = bd;
1150 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1151 g0s0->blocks == cap->r.rNursery);
1152 cap->r.rNursery = g0s0->blocks = bd;
1154 cap->r.rCurrentNursery->u.back = bd;
1156 // initialise it as a nursery block
1160 bd->free = bd->start;
1162 // don't forget to update the block count in g0s0.
1163 g0s0->n_blocks += blocks;
1164 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1166 // now update the nursery to point to the new block
1167 cap->r.rCurrentNursery = bd;
1169 // we might be unlucky and have another thread get on the
1170 // run queue before us and steal the large block, but in that
1171 // case the thread will just end up requesting another large
1173 PUSH_ON_RUN_QUEUE(t);
1178 /* make all the running tasks block on a condition variable,
1179 * maybe set context_switch and wait till they all pile in,
1180 * then have them wait on a GC condition variable.
1182 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
1183 t->id, t, whatNext_strs[t->what_next]));
1186 ASSERT(!is_on_queue(t,CurrentProc));
1188 /* Currently we emit a DESCHEDULE event before GC in GUM.
1189 ToDo: either add separate event to distinguish SYSTEM time from rest
1190 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1191 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1192 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1193 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1194 emitSchedule = rtsTrue;
1198 ready_to_gc = rtsTrue;
1199 context_switch = 1; /* stop other threads ASAP */
1200 PUSH_ON_RUN_QUEUE(t);
1201 /* actual GC is done at the end of the while loop */
1207 DumpGranEvent(GR_DESCHEDULE, t));
1208 globalGranStats.tot_stackover++;
1211 // DumpGranEvent(GR_DESCHEDULE, t);
1212 globalParStats.tot_stackover++;
1214 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
1215 t->id, t, whatNext_strs[t->what_next]));
1216 /* just adjust the stack for this thread, then pop it back
1222 /* enlarge the stack */
1223 StgTSO *new_t = threadStackOverflow(t);
1225 /* This TSO has moved, so update any pointers to it from the
1226 * main thread stack. It better not be on any other queues...
1227 * (it shouldn't be).
1229 for (m = main_threads; m != NULL; m = m->link) {
1234 threadPaused(new_t);
1235 PUSH_ON_RUN_QUEUE(new_t);
1239 case ThreadYielding:
1242 DumpGranEvent(GR_DESCHEDULE, t));
1243 globalGranStats.tot_yields++;
1246 // DumpGranEvent(GR_DESCHEDULE, t);
1247 globalParStats.tot_yields++;
1249 /* put the thread back on the run queue. Then, if we're ready to
1250 * GC, check whether this is the last task to stop. If so, wake
1251 * up the GC thread. getThread will block during a GC until the
1255 if (t->what_next == ThreadEnterInterp) {
1256 /* ToDo: or maybe a timer expired when we were in Hugs?
1257 * or maybe someone hit ctrl-C
1259 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
1260 t->id, t, whatNext_strs[t->what_next]);
1262 belch("--<< thread %ld (%p; %s) stopped, yielding",
1263 t->id, t, whatNext_strs[t->what_next]);
1270 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1272 ASSERT(t->link == END_TSO_QUEUE);
1274 ASSERT(!is_on_queue(t,CurrentProc));
1277 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1278 checkThreadQsSanity(rtsTrue));
1281 if (RtsFlags.ParFlags.doFairScheduling) {
1282 /* this does round-robin scheduling; good for concurrency */
1283 APPEND_TO_RUN_QUEUE(t);
1285 /* this does unfair scheduling; good for parallelism */
1286 PUSH_ON_RUN_QUEUE(t);
1289 /* this does round-robin scheduling; good for concurrency */
1290 APPEND_TO_RUN_QUEUE(t);
1293 /* add a ContinueThread event to actually process the thread */
1294 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1296 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1298 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1307 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1308 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1309 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1311 // ??? needed; should emit block before
1313 DumpGranEvent(GR_DESCHEDULE, t));
1314 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1317 ASSERT(procStatus[CurrentProc]==Busy ||
1318 ((procStatus[CurrentProc]==Fetching) &&
1319 (t->block_info.closure!=(StgClosure*)NULL)));
1320 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1321 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1322 procStatus[CurrentProc]==Fetching))
1323 procStatus[CurrentProc] = Idle;
1327 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1328 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1331 if (t->block_info.closure!=(StgClosure*)NULL)
1332 print_bq(t->block_info.closure));
1334 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1337 /* whatever we schedule next, we must log that schedule */
1338 emitSchedule = rtsTrue;
1341 /* don't need to do anything. Either the thread is blocked on
1342 * I/O, in which case we'll have called addToBlockedQueue
1343 * previously, or it's blocked on an MVar or Blackhole, in which
1344 * case it'll be on the relevant queue already.
1347 fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1348 printThreadBlockage(t);
1349 fprintf(stderr, "\n"));
1351 /* Only for dumping event to log file
1352 ToDo: do I need this in GranSim, too?
1359 case ThreadFinished:
1360 /* Need to check whether this was a main thread, and if so, signal
1361 * the task that started it with the return value. If we have no
1362 * more main threads, we probably need to stop all the tasks until
1365 /* We also end up here if the thread kills itself with an
1366 * uncaught exception, see Exception.hc.
1368 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1370 endThread(t, CurrentProc); // clean-up the thread
1372 /* For now all are advisory -- HWL */
1373 //if(t->priority==AdvisoryPriority) ??
1374 advisory_thread_count--;
1377 if(t->dist.priority==RevalPriority)
1381 if (RtsFlags.ParFlags.ParStats.Full &&
1382 !RtsFlags.ParFlags.ParStats.Suppressed)
1383 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1388 barf("schedule: invalid thread return code %d", (int)ret);
1392 grabCapability(&cap);
1396 if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1397 GarbageCollect(GetRoots, rtsTrue);
1399 performHeapProfile = rtsFalse;
1400 ready_to_gc = rtsFalse; // we already GC'd
1405 if (ready_to_gc && allFreeCapabilities() )
1410 /* everybody back, start the GC.
1411 * Could do it in this thread, or signal a condition var
1412 * to do it in another thread. Either way, we need to
1413 * broadcast on gc_pending_cond afterward.
1415 #if defined(RTS_SUPPORTS_THREADS)
1416 IF_DEBUG(scheduler,sched_belch("doing GC"));
1418 GarbageCollect(GetRoots,rtsFalse);
1419 ready_to_gc = rtsFalse;
1421 broadcastCondition(&gc_pending_cond);
1424 /* add a ContinueThread event to continue execution of current thread */
1425 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1427 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1429 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1437 IF_GRAN_DEBUG(unused,
1438 print_eventq(EventHd));
1440 event = get_next_event();
1443 /* ToDo: wait for next message to arrive rather than busy wait */
1446 } /* end of while(1) */
1448 IF_PAR_DEBUG(verbose,
1449 belch("== Leaving schedule() after having received Finish"));
1452 /* ---------------------------------------------------------------------------
1453 * deleteAllThreads(): kill all the live threads.
1455 * This is used when we catch a user interrupt (^C), before performing
1456 * any necessary cleanups and running finalizers.
1457 * ------------------------------------------------------------------------- */
1459 void deleteAllThreads ( void )
1462 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1463 for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) {
1467 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) {
1471 for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) {
1475 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1476 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1477 sleeping_queue = END_TSO_QUEUE;
1480 /* startThread and insertThread are now in GranSim.c -- HWL */
1483 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1484 //@subsection Suspend and Resume
1486 /* ---------------------------------------------------------------------------
1487 * Suspending & resuming Haskell threads.
1489 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1490 * its capability before calling the C function. This allows another
1491 * task to pick up the capability and carry on running Haskell
1492 * threads. It also means that if the C call blocks, it won't lock
1495 * The Haskell thread making the C call is put to sleep for the
1496 * duration of the call, on the susepended_ccalling_threads queue. We
1497 * give out a token to the task, which it can use to resume the thread
1498 * on return from the C function.
1499 * ------------------------------------------------------------------------- */
1502 suspendThread( StgRegTable *reg )
1507 /* assume that *reg is a pointer to the StgRegTable part
1510 cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1512 ACQUIRE_LOCK(&sched_mutex);
1515 sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1517 threadPaused(cap->r.rCurrentTSO);
1518 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1519 suspended_ccalling_threads = cap->r.rCurrentTSO;
1521 /* Use the thread ID as the token; it should be unique */
1522 tok = cap->r.rCurrentTSO->id;
1524 /* Hand back capability */
1525 releaseCapability(cap);
1527 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
1528 /* Preparing to leave the RTS, so ensure there's a native thread/task
1529 waiting to take over.
1531 ToDo: optimise this and only create a new task if there's a need
1532 for one (i.e., if there's only one Concurrent Haskell thread alive,
1533 there's no need to create a new task).
1535 IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok));
1536 startTask(taskStart);
1541 RELEASE_LOCK(&sched_mutex);
1542 // RELEASE_LOCK(&rts_mutex);
1547 resumeThread( StgInt tok )
1549 StgTSO *tso, **prev;
1552 #if defined(THREADED_RTS)
1553 IF_DEBUG(scheduler, sched_belch("thread %d returning, waiting for sched. lock.\n", tok));
1554 ACQUIRE_LOCK(&sched_mutex);
1556 IF_DEBUG(scheduler, sched_belch("thread %d returning, threads waiting: %d.\n", tok, threads_waiting));
1557 RELEASE_LOCK(&sched_mutex);
1559 IF_DEBUG(scheduler, sched_belch("thread %d waiting for RTS lock...\n", tok));
1560 ACQUIRE_LOCK(&rts_mutex);
1563 IF_DEBUG(scheduler, sched_belch("thread %d acquired RTS lock...\n", tok));
1566 #if defined(THREADED_RTS)
1567 /* Free up any RTS-blocked threads. */
1568 broadcastCondition(&thread_ready_cond);
1571 /* Remove the thread off of the suspended list */
1572 prev = &suspended_ccalling_threads;
1573 for (tso = suspended_ccalling_threads;
1574 tso != END_TSO_QUEUE;
1575 prev = &tso->link, tso = tso->link) {
1576 if (tso->id == (StgThreadID)tok) {
1581 if (tso == END_TSO_QUEUE) {
1582 barf("resumeThread: thread not found");
1584 tso->link = END_TSO_QUEUE;
1586 #if defined(RTS_SUPPORTS_THREADS)
1587 while ( noCapabilities() ) {
1588 IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1589 waitCondition(&thread_ready_cond, &sched_mutex);
1590 IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1594 grabCapability(&cap);
1596 cap->r.rCurrentTSO = tso;
1602 /* ---------------------------------------------------------------------------
1604 * ------------------------------------------------------------------------ */
1605 static void unblockThread(StgTSO *tso);
1607 /* ---------------------------------------------------------------------------
1608 * Comparing Thread ids.
1610 * This is used from STG land in the implementation of the
1611 * instances of Eq/Ord for ThreadIds.
1612 * ------------------------------------------------------------------------ */
1614 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1616 StgThreadID id1 = tso1->id;
1617 StgThreadID id2 = tso2->id;
1619 if (id1 < id2) return (-1);
1620 if (id1 > id2) return 1;
1624 /* ---------------------------------------------------------------------------
1625 * Fetching the ThreadID from an StgTSO.
1627 * This is used in the implementation of Show for ThreadIds.
1628 * ------------------------------------------------------------------------ */
1629 int rts_getThreadId(const StgTSO *tso)
1634 /* ---------------------------------------------------------------------------
1635 Create a new thread.
1637 The new thread starts with the given stack size. Before the
1638 scheduler can run, however, this thread needs to have a closure
1639 (and possibly some arguments) pushed on its stack. See
1640 pushClosure() in Schedule.h.
1642 createGenThread() and createIOThread() (in SchedAPI.h) are
1643 convenient packaged versions of this function.
1645 currently pri (priority) is only used in a GRAN setup -- HWL
1646 ------------------------------------------------------------------------ */
1647 //@cindex createThread
1649 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1651 createThread(nat stack_size, StgInt pri)
1653 return createThread_(stack_size, rtsFalse, pri);
1657 createThread_(nat size, rtsBool have_lock, StgInt pri)
1661 createThread(nat stack_size)
1663 return createThread_(stack_size, rtsFalse);
1667 createThread_(nat size, rtsBool have_lock)
1674 /* First check whether we should create a thread at all */
1676 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1677 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1679 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1680 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1681 return END_TSO_QUEUE;
1687 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1690 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1692 /* catch ridiculously small stack sizes */
1693 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1694 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1697 stack_size = size - TSO_STRUCT_SIZEW;
1699 tso = (StgTSO *)allocate(size);
1700 TICK_ALLOC_TSO(stack_size, 0);
1702 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1704 SET_GRAN_HDR(tso, ThisPE);
1706 tso->what_next = ThreadEnterGHC;
1708 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1709 * protect the increment operation on next_thread_id.
1710 * In future, we could use an atomic increment instead.
1712 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1713 tso->id = next_thread_id++;
1714 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1716 tso->why_blocked = NotBlocked;
1717 tso->blocked_exceptions = NULL;
1719 tso->stack_size = stack_size;
1720 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1722 tso->sp = (P_)&(tso->stack) + stack_size;
1725 tso->prof.CCCS = CCS_MAIN;
1728 /* put a stop frame on the stack */
1729 tso->sp -= sizeofW(StgStopFrame);
1730 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1731 tso->su = (StgUpdateFrame*)tso->sp;
1735 tso->link = END_TSO_QUEUE;
1736 /* uses more flexible routine in GranSim */
1737 insertThread(tso, CurrentProc);
1739 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1745 if (RtsFlags.GranFlags.GranSimStats.Full)
1746 DumpGranEvent(GR_START,tso);
1748 if (RtsFlags.ParFlags.ParStats.Full)
1749 DumpGranEvent(GR_STARTQ,tso);
1750 /* HACk to avoid SCHEDULE
1754 /* Link the new thread on the global thread list.
1756 tso->global_link = all_threads;
1760 tso->dist.priority = MandatoryPriority; //by default that is...
1764 tso->gran.pri = pri;
1766 tso->gran.magic = TSO_MAGIC; // debugging only
1768 tso->gran.sparkname = 0;
1769 tso->gran.startedat = CURRENT_TIME;
1770 tso->gran.exported = 0;
1771 tso->gran.basicblocks = 0;
1772 tso->gran.allocs = 0;
1773 tso->gran.exectime = 0;
1774 tso->gran.fetchtime = 0;
1775 tso->gran.fetchcount = 0;
1776 tso->gran.blocktime = 0;
1777 tso->gran.blockcount = 0;
1778 tso->gran.blockedat = 0;
1779 tso->gran.globalsparks = 0;
1780 tso->gran.localsparks = 0;
1781 if (RtsFlags.GranFlags.Light)
1782 tso->gran.clock = Now; /* local clock */
1784 tso->gran.clock = 0;
1786 IF_DEBUG(gran,printTSO(tso));
1789 tso->par.magic = TSO_MAGIC; // debugging only
1791 tso->par.sparkname = 0;
1792 tso->par.startedat = CURRENT_TIME;
1793 tso->par.exported = 0;
1794 tso->par.basicblocks = 0;
1795 tso->par.allocs = 0;
1796 tso->par.exectime = 0;
1797 tso->par.fetchtime = 0;
1798 tso->par.fetchcount = 0;
1799 tso->par.blocktime = 0;
1800 tso->par.blockcount = 0;
1801 tso->par.blockedat = 0;
1802 tso->par.globalsparks = 0;
1803 tso->par.localsparks = 0;
1807 globalGranStats.tot_threads_created++;
1808 globalGranStats.threads_created_on_PE[CurrentProc]++;
1809 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1810 globalGranStats.tot_sq_probes++;
1812 // collect parallel global statistics (currently done together with GC stats)
1813 if (RtsFlags.ParFlags.ParStats.Global &&
1814 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1815 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
1816 globalParStats.tot_threads_created++;
1822 belch("==__ schedule: Created TSO %d (%p);",
1823 CurrentProc, tso, tso->id));
1825 IF_PAR_DEBUG(verbose,
1826 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1827 tso->id, tso, advisory_thread_count));
1829 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1830 tso->id, tso->stack_size));
1837 all parallel thread creation calls should fall through the following routine.
1840 createSparkThread(rtsSpark spark)
1842 ASSERT(spark != (rtsSpark)NULL);
1843 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1845 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1846 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1847 return END_TSO_QUEUE;
1851 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1852 if (tso==END_TSO_QUEUE)
1853 barf("createSparkThread: Cannot create TSO");
1855 tso->priority = AdvisoryPriority;
1857 pushClosure(tso,spark);
1858 PUSH_ON_RUN_QUEUE(tso);
1859 advisory_thread_count++;
1866 Turn a spark into a thread.
1867 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1870 //@cindex activateSpark
1872 activateSpark (rtsSpark spark)
1876 tso = createSparkThread(spark);
1877 if (RtsFlags.ParFlags.ParStats.Full) {
1878 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1879 IF_PAR_DEBUG(verbose,
1880 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1881 (StgClosure *)spark, info_type((StgClosure *)spark)));
1883 // ToDo: fwd info on local/global spark to thread -- HWL
1884 // tso->gran.exported = spark->exported;
1885 // tso->gran.locked = !spark->global;
1886 // tso->gran.sparkname = spark->name;
1892 /* ---------------------------------------------------------------------------
1895 * scheduleThread puts a thread on the head of the runnable queue.
1896 * This will usually be done immediately after a thread is created.
1897 * The caller of scheduleThread must create the thread using e.g.
1898 * createThread and push an appropriate closure
1899 * on this thread's stack before the scheduler is invoked.
1900 * ------------------------------------------------------------------------ */
1903 scheduleThread(StgTSO *tso)
1905 ACQUIRE_LOCK(&sched_mutex);
1907 /* Put the new thread on the head of the runnable queue. The caller
1908 * better push an appropriate closure on this thread's stack
1909 * beforehand. In the SMP case, the thread may start running as
1910 * soon as we release the scheduler lock below.
1912 PUSH_ON_RUN_QUEUE(tso);
1916 IF_DEBUG(scheduler,printTSO(tso));
1918 RELEASE_LOCK(&sched_mutex);
1921 /* ---------------------------------------------------------------------------
1924 * Initialise the scheduler. This resets all the queues - if the
1925 * queues contained any threads, they'll be garbage collected at the
1928 * ------------------------------------------------------------------------ */
1932 term_handler(int sig STG_UNUSED)
1935 ACQUIRE_LOCK(&term_mutex);
1937 RELEASE_LOCK(&term_mutex);
1948 for (i=0; i<=MAX_PROC; i++) {
1949 run_queue_hds[i] = END_TSO_QUEUE;
1950 run_queue_tls[i] = END_TSO_QUEUE;
1951 blocked_queue_hds[i] = END_TSO_QUEUE;
1952 blocked_queue_tls[i] = END_TSO_QUEUE;
1953 ccalling_threadss[i] = END_TSO_QUEUE;
1954 sleeping_queue = END_TSO_QUEUE;
1957 run_queue_hd = END_TSO_QUEUE;
1958 run_queue_tl = END_TSO_QUEUE;
1959 blocked_queue_hd = END_TSO_QUEUE;
1960 blocked_queue_tl = END_TSO_QUEUE;
1961 sleeping_queue = END_TSO_QUEUE;
1964 suspended_ccalling_threads = END_TSO_QUEUE;
1966 main_threads = NULL;
1967 all_threads = END_TSO_QUEUE;
1972 RtsFlags.ConcFlags.ctxtSwitchTicks =
1973 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1975 #if defined(RTS_SUPPORTS_THREADS)
1976 /* Initialise the mutex and condition variables used by
1978 initMutex(&sched_mutex);
1979 initMutex(&term_mutex);
1981 initCondition(&thread_ready_cond);
1982 #if defined(THREADED_RTS)
1983 initMutex(&rts_mutex);
1986 initCondition(&gc_pending_cond);
1989 #if defined(THREADED_RTS)
1991 ACQUIRE_LOCK(&rts_mutex);
1993 sched_belch("worker thread (%d): acquired RTS lock\n", osThreadId()));
1996 /* Install the SIGHUP handler */
1999 struct sigaction action,oact;
2001 action.sa_handler = term_handler;
2002 sigemptyset(&action.sa_mask);
2003 action.sa_flags = 0;
2004 if (sigaction(SIGTERM, &action, &oact) != 0) {
2005 barf("can't install TERM handler");
2010 /* A capability holds the state a native thread needs in
2011 * order to execute STG code. At least one capability is
2012 * floating around (only SMP builds have more than one).
2016 #if defined(RTS_SUPPORTS_THREADS)
2017 /* start our haskell execution tasks */
2019 startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2021 startTaskManager(0,taskStart);
2025 #if /* defined(SMP) ||*/ defined(PAR)
2031 exitScheduler( void )
2033 #if defined(RTS_SUPPORTS_THREADS)
2038 /* -----------------------------------------------------------------------------
2039 Managing the per-task allocation areas.
2041 Each capability comes with an allocation area. These are
2042 fixed-length block lists into which allocation can be done.
2044 ToDo: no support for two-space collection at the moment???
2045 -------------------------------------------------------------------------- */
2047 /* -----------------------------------------------------------------------------
2048 * waitThread is the external interface for running a new computation
2049 * and waiting for the result.
2051 * In the non-SMP case, we create a new main thread, push it on the
2052 * main-thread stack, and invoke the scheduler to run it. The
2053 * scheduler will return when the top main thread on the stack has
2054 * completed or died, and fill in the necessary fields of the
2055 * main_thread structure.
2057 * In the SMP case, we create a main thread as before, but we then
2058 * create a new condition variable and sleep on it. When our new
2059 * main thread has completed, we'll be woken up and the status/result
2060 * will be in the main_thread struct.
2061 * -------------------------------------------------------------------------- */
2064 howManyThreadsAvail ( void )
2068 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2070 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2072 for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2078 finishAllThreads ( void )
2081 while (run_queue_hd != END_TSO_QUEUE) {
2082 waitThread ( run_queue_hd, NULL );
2084 while (blocked_queue_hd != END_TSO_QUEUE) {
2085 waitThread ( blocked_queue_hd, NULL );
2087 while (sleeping_queue != END_TSO_QUEUE) {
2088 waitThread ( blocked_queue_hd, NULL );
2091 (blocked_queue_hd != END_TSO_QUEUE ||
2092 run_queue_hd != END_TSO_QUEUE ||
2093 sleeping_queue != END_TSO_QUEUE);
2097 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2100 SchedulerStatus stat;
2102 ACQUIRE_LOCK(&sched_mutex);
2104 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2109 #if defined(RTS_SUPPORTS_THREADS)
2110 initCondition(&m->wakeup);
2113 m->link = main_threads;
2116 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n",
2121 waitCondition(&m->wakeup, &sched_mutex);
2122 } while (m->stat == NoStatus);
2124 /* GranSim specific init */
2125 CurrentTSO = m->tso; // the TSO to run
2126 procStatus[MainProc] = Busy; // status of main PE
2127 CurrentProc = MainProc; // PE to run it on
2131 RELEASE_LOCK(&sched_mutex);
2133 ASSERT(m->stat != NoStatus);
2138 #if defined(RTS_SUPPORTS_THREADS)
2139 closeCondition(&m->wakeup);
2142 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
2146 RELEASE_LOCK(&sched_mutex);
2151 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2152 //@subsection Run queue code
2156 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2157 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2158 implicit global variable that has to be correct when calling these
2162 /* Put the new thread on the head of the runnable queue.
2163 * The caller of createThread better push an appropriate closure
2164 * on this thread's stack before the scheduler is invoked.
2166 static /* inline */ void
2167 add_to_run_queue(tso)
2170 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2171 tso->link = run_queue_hd;
2173 if (run_queue_tl == END_TSO_QUEUE) {
2178 /* Put the new thread at the end of the runnable queue. */
2179 static /* inline */ void
2180 push_on_run_queue(tso)
2183 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2184 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2185 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2186 if (run_queue_hd == END_TSO_QUEUE) {
2189 run_queue_tl->link = tso;
2195 Should be inlined because it's used very often in schedule. The tso
2196 argument is actually only needed in GranSim, where we want to have the
2197 possibility to schedule *any* TSO on the run queue, irrespective of the
2198 actual ordering. Therefore, if tso is not the nil TSO then we traverse
2199 the run queue and dequeue the tso, adjusting the links in the queue.
2201 //@cindex take_off_run_queue
2202 static /* inline */ StgTSO*
2203 take_off_run_queue(StgTSO *tso) {
2207 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2209 if tso is specified, unlink that tso from the run_queue (doesn't have
2210 to be at the beginning of the queue); GranSim only
2212 if (tso!=END_TSO_QUEUE) {
2213 /* find tso in queue */
2214 for (t=run_queue_hd, prev=END_TSO_QUEUE;
2215 t!=END_TSO_QUEUE && t!=tso;
2219 /* now actually dequeue the tso */
2220 if (prev!=END_TSO_QUEUE) {
2221 ASSERT(run_queue_hd!=t);
2222 prev->link = t->link;
2224 /* t is at beginning of thread queue */
2225 ASSERT(run_queue_hd==t);
2226 run_queue_hd = t->link;
2228 /* t is at end of thread queue */
2229 if (t->link==END_TSO_QUEUE) {
2230 ASSERT(t==run_queue_tl);
2231 run_queue_tl = prev;
2233 ASSERT(run_queue_tl!=t);
2235 t->link = END_TSO_QUEUE;
2237 /* take tso from the beginning of the queue; std concurrent code */
2239 if (t != END_TSO_QUEUE) {
2240 run_queue_hd = t->link;
2241 t->link = END_TSO_QUEUE;
2242 if (run_queue_hd == END_TSO_QUEUE) {
2243 run_queue_tl = END_TSO_QUEUE;
2252 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2253 //@subsection Garbage Collextion Routines
2255 /* ---------------------------------------------------------------------------
2256 Where are the roots that we know about?
2258 - all the threads on the runnable queue
2259 - all the threads on the blocked queue
2260 - all the threads on the sleeping queue
2261 - all the thread currently executing a _ccall_GC
2262 - all the "main threads"
2264 ------------------------------------------------------------------------ */
2266 /* This has to be protected either by the scheduler monitor, or by the
2267 garbage collection monitor (probably the latter).
2272 GetRoots(evac_fn evac)
2279 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2280 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2281 evac((StgClosure **)&run_queue_hds[i]);
2282 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2283 evac((StgClosure **)&run_queue_tls[i]);
2285 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2286 evac((StgClosure **)&blocked_queue_hds[i]);
2287 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2288 evac((StgClosure **)&blocked_queue_tls[i]);
2289 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2290 evac((StgClosure **)&ccalling_threads[i]);
2297 if (run_queue_hd != END_TSO_QUEUE) {
2298 ASSERT(run_queue_tl != END_TSO_QUEUE);
2299 evac((StgClosure **)&run_queue_hd);
2300 evac((StgClosure **)&run_queue_tl);
2303 if (blocked_queue_hd != END_TSO_QUEUE) {
2304 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2305 evac((StgClosure **)&blocked_queue_hd);
2306 evac((StgClosure **)&blocked_queue_tl);
2309 if (sleeping_queue != END_TSO_QUEUE) {
2310 evac((StgClosure **)&sleeping_queue);
2314 for (m = main_threads; m != NULL; m = m->link) {
2315 evac((StgClosure **)&m->tso);
2317 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2318 evac((StgClosure **)&suspended_ccalling_threads);
2321 #if defined(PAR) || defined(GRAN)
2322 markSparkQueue(evac);
2326 /* -----------------------------------------------------------------------------
2329 This is the interface to the garbage collector from Haskell land.
2330 We provide this so that external C code can allocate and garbage
2331 collect when called from Haskell via _ccall_GC.
2333 It might be useful to provide an interface whereby the programmer
2334 can specify more roots (ToDo).
2336 This needs to be protected by the GC condition variable above. KH.
2337 -------------------------------------------------------------------------- */
2339 void (*extra_roots)(evac_fn);
2344 GarbageCollect(GetRoots,rtsFalse);
2348 performMajorGC(void)
2350 GarbageCollect(GetRoots,rtsTrue);
2354 AllRoots(evac_fn evac)
2356 GetRoots(evac); // the scheduler's roots
2357 extra_roots(evac); // the user's roots
2361 performGCWithRoots(void (*get_roots)(evac_fn))
2363 extra_roots = get_roots;
2364 GarbageCollect(AllRoots,rtsFalse);
2367 /* -----------------------------------------------------------------------------
2370 If the thread has reached its maximum stack size, then raise the
2371 StackOverflow exception in the offending thread. Otherwise
2372 relocate the TSO into a larger chunk of memory and adjust its stack
2374 -------------------------------------------------------------------------- */
2377 threadStackOverflow(StgTSO *tso)
2379 nat new_stack_size, new_tso_size, diff, stack_words;
2383 IF_DEBUG(sanity,checkTSO(tso));
2384 if (tso->stack_size >= tso->max_stack_size) {
2387 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2388 tso->id, tso, tso->stack_size, tso->max_stack_size);
2389 /* If we're debugging, just print out the top of the stack */
2390 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2393 /* Send this thread the StackOverflow exception */
2394 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2398 /* Try to double the current stack size. If that takes us over the
2399 * maximum stack size for this thread, then use the maximum instead.
2400 * Finally round up so the TSO ends up as a whole number of blocks.
2402 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2403 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2404 TSO_STRUCT_SIZE)/sizeof(W_);
2405 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2406 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2408 IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2410 dest = (StgTSO *)allocate(new_tso_size);
2411 TICK_ALLOC_TSO(new_stack_size,0);
2413 /* copy the TSO block and the old stack into the new area */
2414 memcpy(dest,tso,TSO_STRUCT_SIZE);
2415 stack_words = tso->stack + tso->stack_size - tso->sp;
2416 new_sp = (P_)dest + new_tso_size - stack_words;
2417 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2419 /* relocate the stack pointers... */
2420 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2421 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2423 dest->stack_size = new_stack_size;
2425 /* and relocate the update frame list */
2426 relocate_stack(dest, diff);
2428 /* Mark the old TSO as relocated. We have to check for relocated
2429 * TSOs in the garbage collector and any primops that deal with TSOs.
2431 * It's important to set the sp and su values to just beyond the end
2432 * of the stack, so we don't attempt to scavenge any part of the
2435 tso->what_next = ThreadRelocated;
2437 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2438 tso->su = (StgUpdateFrame *)tso->sp;
2439 tso->why_blocked = NotBlocked;
2440 dest->mut_link = NULL;
2442 IF_PAR_DEBUG(verbose,
2443 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2444 tso->id, tso, tso->stack_size);
2445 /* If we're debugging, just print out the top of the stack */
2446 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2449 IF_DEBUG(sanity,checkTSO(tso));
2451 IF_DEBUG(scheduler,printTSO(dest));
2457 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2458 //@subsection Blocking Queue Routines
2460 /* ---------------------------------------------------------------------------
2461 Wake up a queue that was blocked on some resource.
2462 ------------------------------------------------------------------------ */
2466 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2471 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2473 /* write RESUME events to log file and
2474 update blocked and fetch time (depending on type of the orig closure) */
2475 if (RtsFlags.ParFlags.ParStats.Full) {
2476 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2477 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2478 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2479 if (EMPTY_RUN_QUEUE())
2480 emitSchedule = rtsTrue;
2482 switch (get_itbl(node)->type) {
2484 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2489 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2496 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2503 static StgBlockingQueueElement *
2504 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2507 PEs node_loc, tso_loc;
2509 node_loc = where_is(node); // should be lifted out of loop
2510 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2511 tso_loc = where_is((StgClosure *)tso);
2512 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2513 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2514 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2515 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2516 // insertThread(tso, node_loc);
2517 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2519 tso, node, (rtsSpark*)NULL);
2520 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2523 } else { // TSO is remote (actually should be FMBQ)
2524 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2525 RtsFlags.GranFlags.Costs.gunblocktime +
2526 RtsFlags.GranFlags.Costs.latency;
2527 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2529 tso, node, (rtsSpark*)NULL);
2530 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2533 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2535 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2536 (node_loc==tso_loc ? "Local" : "Global"),
2537 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2538 tso->block_info.closure = NULL;
2539 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2543 static StgBlockingQueueElement *
2544 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2546 StgBlockingQueueElement *next;
2548 switch (get_itbl(bqe)->type) {
2550 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2551 /* if it's a TSO just push it onto the run_queue */
2553 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2554 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2556 unblockCount(bqe, node);
2557 /* reset blocking status after dumping event */
2558 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2562 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2564 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2565 PendingFetches = (StgBlockedFetch *)bqe;
2569 /* can ignore this case in a non-debugging setup;
2570 see comments on RBHSave closures above */
2572 /* check that the closure is an RBHSave closure */
2573 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2574 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2575 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2579 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2580 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2584 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2588 #else /* !GRAN && !PAR */
2590 unblockOneLocked(StgTSO *tso)
2594 ASSERT(get_itbl(tso)->type == TSO);
2595 ASSERT(tso->why_blocked != NotBlocked);
2596 tso->why_blocked = NotBlocked;
2598 PUSH_ON_RUN_QUEUE(tso);
2600 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2605 #if defined(GRAN) || defined(PAR)
2606 inline StgBlockingQueueElement *
2607 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2609 ACQUIRE_LOCK(&sched_mutex);
2610 bqe = unblockOneLocked(bqe, node);
2611 RELEASE_LOCK(&sched_mutex);
2616 unblockOne(StgTSO *tso)
2618 ACQUIRE_LOCK(&sched_mutex);
2619 tso = unblockOneLocked(tso);
2620 RELEASE_LOCK(&sched_mutex);
2627 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2629 StgBlockingQueueElement *bqe;
2634 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2635 node, CurrentProc, CurrentTime[CurrentProc],
2636 CurrentTSO->id, CurrentTSO));
2638 node_loc = where_is(node);
2640 ASSERT(q == END_BQ_QUEUE ||
2641 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2642 get_itbl(q)->type == CONSTR); // closure (type constructor)
2643 ASSERT(is_unique(node));
2645 /* FAKE FETCH: magically copy the node to the tso's proc;
2646 no Fetch necessary because in reality the node should not have been
2647 moved to the other PE in the first place
2649 if (CurrentProc!=node_loc) {
2651 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2652 node, node_loc, CurrentProc, CurrentTSO->id,
2653 // CurrentTSO, where_is(CurrentTSO),
2654 node->header.gran.procs));
2655 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2657 belch("## new bitmask of node %p is %#x",
2658 node, node->header.gran.procs));
2659 if (RtsFlags.GranFlags.GranSimStats.Global) {
2660 globalGranStats.tot_fake_fetches++;
2665 // ToDo: check: ASSERT(CurrentProc==node_loc);
2666 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2669 bqe points to the current element in the queue
2670 next points to the next element in the queue
2672 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2673 //tso_loc = where_is(tso);
2675 bqe = unblockOneLocked(bqe, node);
2678 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2679 the closure to make room for the anchor of the BQ */
2680 if (bqe!=END_BQ_QUEUE) {
2681 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2683 ASSERT((info_ptr==&RBH_Save_0_info) ||
2684 (info_ptr==&RBH_Save_1_info) ||
2685 (info_ptr==&RBH_Save_2_info));
2687 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2688 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2689 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2692 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2693 node, info_type(node)));
2696 /* statistics gathering */
2697 if (RtsFlags.GranFlags.GranSimStats.Global) {
2698 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2699 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2700 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2701 globalGranStats.tot_awbq++; // total no. of bqs awakened
2704 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2705 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2709 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2711 StgBlockingQueueElement *bqe;
2713 ACQUIRE_LOCK(&sched_mutex);
2715 IF_PAR_DEBUG(verbose,
2716 belch("##-_ AwBQ for node %p on [%x]: ",
2720 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2721 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2726 ASSERT(q == END_BQ_QUEUE ||
2727 get_itbl(q)->type == TSO ||
2728 get_itbl(q)->type == BLOCKED_FETCH ||
2729 get_itbl(q)->type == CONSTR);
2732 while (get_itbl(bqe)->type==TSO ||
2733 get_itbl(bqe)->type==BLOCKED_FETCH) {
2734 bqe = unblockOneLocked(bqe, node);
2736 RELEASE_LOCK(&sched_mutex);
2739 #else /* !GRAN && !PAR */
2741 awakenBlockedQueue(StgTSO *tso)
2743 ACQUIRE_LOCK(&sched_mutex);
2744 while (tso != END_TSO_QUEUE) {
2745 tso = unblockOneLocked(tso);
2747 RELEASE_LOCK(&sched_mutex);
2751 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2752 //@subsection Exception Handling Routines
2754 /* ---------------------------------------------------------------------------
2756 - usually called inside a signal handler so it mustn't do anything fancy.
2757 ------------------------------------------------------------------------ */
2760 interruptStgRts(void)
2766 /* -----------------------------------------------------------------------------
2769 This is for use when we raise an exception in another thread, which
2771 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2772 -------------------------------------------------------------------------- */
2774 #if defined(GRAN) || defined(PAR)
2776 NB: only the type of the blocking queue is different in GranSim and GUM
2777 the operations on the queue-elements are the same
2778 long live polymorphism!
2781 unblockThread(StgTSO *tso)
2783 StgBlockingQueueElement *t, **last;
2785 ACQUIRE_LOCK(&sched_mutex);
2786 switch (tso->why_blocked) {
2789 return; /* not blocked */
2792 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2794 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2795 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2797 last = (StgBlockingQueueElement **)&mvar->head;
2798 for (t = (StgBlockingQueueElement *)mvar->head;
2800 last = &t->link, last_tso = t, t = t->link) {
2801 if (t == (StgBlockingQueueElement *)tso) {
2802 *last = (StgBlockingQueueElement *)tso->link;
2803 if (mvar->tail == tso) {
2804 mvar->tail = (StgTSO *)last_tso;
2809 barf("unblockThread (MVAR): TSO not found");
2812 case BlockedOnBlackHole:
2813 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2815 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2817 last = &bq->blocking_queue;
2818 for (t = bq->blocking_queue;
2820 last = &t->link, t = t->link) {
2821 if (t == (StgBlockingQueueElement *)tso) {
2822 *last = (StgBlockingQueueElement *)tso->link;
2826 barf("unblockThread (BLACKHOLE): TSO not found");
2829 case BlockedOnException:
2831 StgTSO *target = tso->block_info.tso;
2833 ASSERT(get_itbl(target)->type == TSO);
2835 if (target->what_next == ThreadRelocated) {
2836 target = target->link;
2837 ASSERT(get_itbl(target)->type == TSO);
2840 ASSERT(target->blocked_exceptions != NULL);
2842 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2843 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2845 last = &t->link, t = t->link) {
2846 ASSERT(get_itbl(t)->type == TSO);
2847 if (t == (StgBlockingQueueElement *)tso) {
2848 *last = (StgBlockingQueueElement *)tso->link;
2852 barf("unblockThread (Exception): TSO not found");
2856 case BlockedOnWrite:
2858 /* take TSO off blocked_queue */
2859 StgBlockingQueueElement *prev = NULL;
2860 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2861 prev = t, t = t->link) {
2862 if (t == (StgBlockingQueueElement *)tso) {
2864 blocked_queue_hd = (StgTSO *)t->link;
2865 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2866 blocked_queue_tl = END_TSO_QUEUE;
2869 prev->link = t->link;
2870 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2871 blocked_queue_tl = (StgTSO *)prev;
2877 barf("unblockThread (I/O): TSO not found");
2880 case BlockedOnDelay:
2882 /* take TSO off sleeping_queue */
2883 StgBlockingQueueElement *prev = NULL;
2884 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2885 prev = t, t = t->link) {
2886 if (t == (StgBlockingQueueElement *)tso) {
2888 sleeping_queue = (StgTSO *)t->link;
2890 prev->link = t->link;
2895 barf("unblockThread (I/O): TSO not found");
2899 barf("unblockThread");
2903 tso->link = END_TSO_QUEUE;
2904 tso->why_blocked = NotBlocked;
2905 tso->block_info.closure = NULL;
2906 PUSH_ON_RUN_QUEUE(tso);
2907 RELEASE_LOCK(&sched_mutex);
2911 unblockThread(StgTSO *tso)
2915 ACQUIRE_LOCK(&sched_mutex);
2916 switch (tso->why_blocked) {
2919 return; /* not blocked */
2922 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2924 StgTSO *last_tso = END_TSO_QUEUE;
2925 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2928 for (t = mvar->head; t != END_TSO_QUEUE;
2929 last = &t->link, last_tso = t, t = t->link) {
2932 if (mvar->tail == tso) {
2933 mvar->tail = last_tso;
2938 barf("unblockThread (MVAR): TSO not found");
2941 case BlockedOnBlackHole:
2942 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2944 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2946 last = &bq->blocking_queue;
2947 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2948 last = &t->link, t = t->link) {
2954 barf("unblockThread (BLACKHOLE): TSO not found");
2957 case BlockedOnException:
2959 StgTSO *target = tso->block_info.tso;
2961 ASSERT(get_itbl(target)->type == TSO);
2963 while (target->what_next == ThreadRelocated) {
2964 target = target->link;
2965 ASSERT(get_itbl(target)->type == TSO);
2968 ASSERT(target->blocked_exceptions != NULL);
2970 last = &target->blocked_exceptions;
2971 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2972 last = &t->link, t = t->link) {
2973 ASSERT(get_itbl(t)->type == TSO);
2979 barf("unblockThread (Exception): TSO not found");
2983 case BlockedOnWrite:
2985 StgTSO *prev = NULL;
2986 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2987 prev = t, t = t->link) {
2990 blocked_queue_hd = t->link;
2991 if (blocked_queue_tl == t) {
2992 blocked_queue_tl = END_TSO_QUEUE;
2995 prev->link = t->link;
2996 if (blocked_queue_tl == t) {
2997 blocked_queue_tl = prev;
3003 barf("unblockThread (I/O): TSO not found");
3006 case BlockedOnDelay:
3008 StgTSO *prev = NULL;
3009 for (t = sleeping_queue; t != END_TSO_QUEUE;
3010 prev = t, t = t->link) {
3013 sleeping_queue = t->link;
3015 prev->link = t->link;
3020 barf("unblockThread (I/O): TSO not found");
3024 barf("unblockThread");
3028 tso->link = END_TSO_QUEUE;
3029 tso->why_blocked = NotBlocked;
3030 tso->block_info.closure = NULL;
3031 PUSH_ON_RUN_QUEUE(tso);
3032 RELEASE_LOCK(&sched_mutex);
3036 /* -----------------------------------------------------------------------------
3039 * The following function implements the magic for raising an
3040 * asynchronous exception in an existing thread.
3042 * We first remove the thread from any queue on which it might be
3043 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3045 * We strip the stack down to the innermost CATCH_FRAME, building
3046 * thunks in the heap for all the active computations, so they can
3047 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3048 * an application of the handler to the exception, and push it on
3049 * the top of the stack.
3051 * How exactly do we save all the active computations? We create an
3052 * AP_UPD for every UpdateFrame on the stack. Entering one of these
3053 * AP_UPDs pushes everything from the corresponding update frame
3054 * upwards onto the stack. (Actually, it pushes everything up to the
3055 * next update frame plus a pointer to the next AP_UPD object.
3056 * Entering the next AP_UPD object pushes more onto the stack until we
3057 * reach the last AP_UPD object - at which point the stack should look
3058 * exactly as it did when we killed the TSO and we can continue
3059 * execution by entering the closure on top of the stack.
3061 * We can also kill a thread entirely - this happens if either (a) the
3062 * exception passed to raiseAsync is NULL, or (b) there's no
3063 * CATCH_FRAME on the stack. In either case, we strip the entire
3064 * stack and replace the thread with a zombie.
3066 * -------------------------------------------------------------------------- */
3069 deleteThread(StgTSO *tso)
3071 raiseAsync(tso,NULL);
3075 raiseAsync(StgTSO *tso, StgClosure *exception)
3077 StgUpdateFrame* su = tso->su;
3078 StgPtr sp = tso->sp;
3080 /* Thread already dead? */
3081 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3085 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3087 /* Remove it from any blocking queues */
3090 /* The stack freezing code assumes there's a closure pointer on
3091 * the top of the stack. This isn't always the case with compiled
3092 * code, so we have to push a dummy closure on the top which just
3093 * returns to the next return address on the stack.
3095 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3096 *(--sp) = (W_)&stg_dummy_ret_closure;
3100 nat words = ((P_)su - (P_)sp) - 1;
3104 /* If we find a CATCH_FRAME, and we've got an exception to raise,
3105 * then build PAP(handler,exception,realworld#), and leave it on
3106 * top of the stack ready to enter.
3108 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3109 StgCatchFrame *cf = (StgCatchFrame *)su;
3110 /* we've got an exception to raise, so let's pass it to the
3111 * handler in this frame.
3113 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3114 TICK_ALLOC_UPD_PAP(3,0);
3115 SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3118 ap->fun = cf->handler; /* :: Exception -> IO a */
3119 ap->payload[0] = exception;
3120 ap->payload[1] = ARG_TAG(0); /* realworld token */
3122 /* throw away the stack from Sp up to and including the
3125 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
3128 /* Restore the blocked/unblocked state for asynchronous exceptions
3129 * at the CATCH_FRAME.
3131 * If exceptions were unblocked at the catch, arrange that they
3132 * are unblocked again after executing the handler by pushing an
3133 * unblockAsyncExceptions_ret stack frame.
3135 if (!cf->exceptions_blocked) {
3136 *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3139 /* Ensure that async exceptions are blocked when running the handler.
3141 if (tso->blocked_exceptions == NULL) {
3142 tso->blocked_exceptions = END_TSO_QUEUE;
3145 /* Put the newly-built PAP on top of the stack, ready to execute
3146 * when the thread restarts.
3150 tso->what_next = ThreadEnterGHC;
3151 IF_DEBUG(sanity, checkTSO(tso));
3155 /* First build an AP_UPD consisting of the stack chunk above the
3156 * current update frame, with the top word on the stack as the
3159 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3164 ap->fun = (StgClosure *)sp[0];
3166 for(i=0; i < (nat)words; ++i) {
3167 ap->payload[i] = (StgClosure *)*sp++;
3170 switch (get_itbl(su)->type) {
3174 SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */);
3175 TICK_ALLOC_UP_THK(words+1,0);
3178 fprintf(stderr, "scheduler: Updating ");
3179 printPtr((P_)su->updatee);
3180 fprintf(stderr, " with ");
3181 printObj((StgClosure *)ap);
3184 /* Replace the updatee with an indirection - happily
3185 * this will also wake up any threads currently
3186 * waiting on the result.
3188 * Warning: if we're in a loop, more than one update frame on
3189 * the stack may point to the same object. Be careful not to
3190 * overwrite an IND_OLDGEN in this case, because we'll screw
3191 * up the mutable lists. To be on the safe side, don't
3192 * overwrite any kind of indirection at all. See also
3193 * threadSqueezeStack in GC.c, where we have to make a similar
3196 if (!closure_IND(su->updatee)) {
3197 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
3200 sp += sizeofW(StgUpdateFrame) -1;
3201 sp[0] = (W_)ap; /* push onto stack */
3207 StgCatchFrame *cf = (StgCatchFrame *)su;
3210 /* We want a PAP, not an AP_UPD. Fortunately, the
3211 * layout's the same.
3213 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3214 TICK_ALLOC_UPD_PAP(words+1,0);
3216 /* now build o = FUN(catch,ap,handler) */
3217 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3218 TICK_ALLOC_FUN(2,0);
3219 SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3220 o->payload[0] = (StgClosure *)ap;
3221 o->payload[1] = cf->handler;
3224 fprintf(stderr, "scheduler: Built ");
3225 printObj((StgClosure *)o);
3228 /* pop the old handler and put o on the stack */
3230 sp += sizeofW(StgCatchFrame) - 1;
3237 StgSeqFrame *sf = (StgSeqFrame *)su;
3240 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3241 TICK_ALLOC_UPD_PAP(words+1,0);
3243 /* now build o = FUN(seq,ap) */
3244 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3245 TICK_ALLOC_SE_THK(1,0);
3246 SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3247 o->payload[0] = (StgClosure *)ap;
3250 fprintf(stderr, "scheduler: Built ");
3251 printObj((StgClosure *)o);
3254 /* pop the old handler and put o on the stack */
3256 sp += sizeofW(StgSeqFrame) - 1;
3262 /* We've stripped the entire stack, the thread is now dead. */
3263 sp += sizeofW(StgStopFrame) - 1;
3264 sp[0] = (W_)exception; /* save the exception */
3265 tso->what_next = ThreadKilled;
3266 tso->su = (StgUpdateFrame *)(sp+1);
3277 /* -----------------------------------------------------------------------------
3278 resurrectThreads is called after garbage collection on the list of
3279 threads found to be garbage. Each of these threads will be woken
3280 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3281 on an MVar, or NonTermination if the thread was blocked on a Black
3283 -------------------------------------------------------------------------- */
3286 resurrectThreads( StgTSO *threads )
3290 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3291 next = tso->global_link;
3292 tso->global_link = all_threads;
3294 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3296 switch (tso->why_blocked) {
3298 case BlockedOnException:
3299 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3301 case BlockedOnBlackHole:
3302 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3305 /* This might happen if the thread was blocked on a black hole
3306 * belonging to a thread that we've just woken up (raiseAsync
3307 * can wake up threads, remember...).
3311 barf("resurrectThreads: thread blocked in a strange way");
3316 /* -----------------------------------------------------------------------------
3317 * Blackhole detection: if we reach a deadlock, test whether any
3318 * threads are blocked on themselves. Any threads which are found to
3319 * be self-blocked get sent a NonTermination exception.
3321 * This is only done in a deadlock situation in order to avoid
3322 * performance overhead in the normal case.
3323 * -------------------------------------------------------------------------- */
3326 detectBlackHoles( void )
3328 StgTSO *t = all_threads;
3329 StgUpdateFrame *frame;
3330 StgClosure *blocked_on;
3332 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3334 while (t->what_next == ThreadRelocated) {
3336 ASSERT(get_itbl(t)->type == TSO);
3339 if (t->why_blocked != BlockedOnBlackHole) {
3343 blocked_on = t->block_info.closure;
3345 for (frame = t->su; ; frame = frame->link) {
3346 switch (get_itbl(frame)->type) {
3349 if (frame->updatee == blocked_on) {
3350 /* We are blocking on one of our own computations, so
3351 * send this thread the NonTermination exception.
3354 sched_belch("thread %d is blocked on itself", t->id));
3355 raiseAsync(t, (StgClosure *)NonTermination_closure);
3376 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3377 //@subsection Debugging Routines
3379 /* -----------------------------------------------------------------------------
3380 Debugging: why is a thread blocked
3381 -------------------------------------------------------------------------- */
3386 printThreadBlockage(StgTSO *tso)
3388 switch (tso->why_blocked) {
3390 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3392 case BlockedOnWrite:
3393 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3395 case BlockedOnDelay:
3396 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3399 fprintf(stderr,"is blocked on an MVar");
3401 case BlockedOnException:
3402 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3403 tso->block_info.tso->id);
3405 case BlockedOnBlackHole:
3406 fprintf(stderr,"is blocked on a black hole");
3409 fprintf(stderr,"is not blocked");
3413 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3414 tso->block_info.closure, info_type(tso->block_info.closure));
3416 case BlockedOnGA_NoSend:
3417 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3418 tso->block_info.closure, info_type(tso->block_info.closure));
3422 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3423 tso->why_blocked, tso->id, tso);
3428 printThreadStatus(StgTSO *tso)
3430 switch (tso->what_next) {
3432 fprintf(stderr,"has been killed");
3434 case ThreadComplete:
3435 fprintf(stderr,"has completed");
3438 printThreadBlockage(tso);
3443 printAllThreads(void)
3448 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3449 ullong_format_string(TIME_ON_PROC(CurrentProc),
3450 time_string, rtsFalse/*no commas!*/);
3452 sched_belch("all threads at [%s]:", time_string);
3454 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3455 ullong_format_string(CURRENT_TIME,
3456 time_string, rtsFalse/*no commas!*/);
3458 sched_belch("all threads at [%s]:", time_string);
3460 sched_belch("all threads:");
3463 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3464 fprintf(stderr, "\tthread %d ", t->id);
3465 printThreadStatus(t);
3466 fprintf(stderr,"\n");
3471 Print a whole blocking queue attached to node (debugging only).
3476 print_bq (StgClosure *node)
3478 StgBlockingQueueElement *bqe;
3482 fprintf(stderr,"## BQ of closure %p (%s): ",
3483 node, info_type(node));
3485 /* should cover all closures that may have a blocking queue */
3486 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3487 get_itbl(node)->type == FETCH_ME_BQ ||
3488 get_itbl(node)->type == RBH ||
3489 get_itbl(node)->type == MVAR);
3491 ASSERT(node!=(StgClosure*)NULL); // sanity check
3493 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3497 Print a whole blocking queue starting with the element bqe.
3500 print_bqe (StgBlockingQueueElement *bqe)
3505 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3507 for (end = (bqe==END_BQ_QUEUE);
3508 !end; // iterate until bqe points to a CONSTR
3509 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3510 bqe = end ? END_BQ_QUEUE : bqe->link) {
3511 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3512 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3513 /* types of closures that may appear in a blocking queue */
3514 ASSERT(get_itbl(bqe)->type == TSO ||
3515 get_itbl(bqe)->type == BLOCKED_FETCH ||
3516 get_itbl(bqe)->type == CONSTR);
3517 /* only BQs of an RBH end with an RBH_Save closure */
3518 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3520 switch (get_itbl(bqe)->type) {
3522 fprintf(stderr," TSO %u (%x),",
3523 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3526 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3527 ((StgBlockedFetch *)bqe)->node,
3528 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3529 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3530 ((StgBlockedFetch *)bqe)->ga.weight);
3533 fprintf(stderr," %s (IP %p),",
3534 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3535 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3536 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3537 "RBH_Save_?"), get_itbl(bqe));
3540 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3541 info_type((StgClosure *)bqe)); // , node, info_type(node));
3545 fputc('\n', stderr);
3547 # elif defined(GRAN)
3549 print_bq (StgClosure *node)
3551 StgBlockingQueueElement *bqe;
3552 PEs node_loc, tso_loc;
3555 /* should cover all closures that may have a blocking queue */
3556 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3557 get_itbl(node)->type == FETCH_ME_BQ ||
3558 get_itbl(node)->type == RBH);
3560 ASSERT(node!=(StgClosure*)NULL); // sanity check
3561 node_loc = where_is(node);
3563 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3564 node, info_type(node), node_loc);
3567 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3569 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3570 !end; // iterate until bqe points to a CONSTR
3571 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3572 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3573 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3574 /* types of closures that may appear in a blocking queue */
3575 ASSERT(get_itbl(bqe)->type == TSO ||
3576 get_itbl(bqe)->type == CONSTR);
3577 /* only BQs of an RBH end with an RBH_Save closure */
3578 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3580 tso_loc = where_is((StgClosure *)bqe);
3581 switch (get_itbl(bqe)->type) {
3583 fprintf(stderr," TSO %d (%p) on [PE %d],",
3584 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3587 fprintf(stderr," %s (IP %p),",
3588 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3589 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3590 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3591 "RBH_Save_?"), get_itbl(bqe));
3594 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3595 info_type((StgClosure *)bqe), node, info_type(node));
3599 fputc('\n', stderr);
3603 Nice and easy: only TSOs on the blocking queue
3606 print_bq (StgClosure *node)
3610 ASSERT(node!=(StgClosure*)NULL); // sanity check
3611 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3612 tso != END_TSO_QUEUE;
3614 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3615 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3616 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3618 fputc('\n', stderr);
3629 for (i=0, tso=run_queue_hd;
3630 tso != END_TSO_QUEUE;
3639 sched_belch(char *s, ...)
3644 fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3646 fprintf(stderr, "== ");
3648 fprintf(stderr, "scheduler: ");
3650 vfprintf(stderr, s, ap);
3651 fprintf(stderr, "\n");
3657 //@node Index, , Debugging Routines, Main scheduling code
3661 //* MainRegTable:: @cindex\s-+MainRegTable
3662 //* StgMainThread:: @cindex\s-+StgMainThread
3663 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3664 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3665 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3666 //* context_switch:: @cindex\s-+context_switch
3667 //* createThread:: @cindex\s-+createThread
3668 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3669 //* initScheduler:: @cindex\s-+initScheduler
3670 //* interrupted:: @cindex\s-+interrupted
3671 //* next_thread_id:: @cindex\s-+next_thread_id
3672 //* print_bq:: @cindex\s-+print_bq
3673 //* run_queue_hd:: @cindex\s-+run_queue_hd
3674 //* run_queue_tl:: @cindex\s-+run_queue_tl
3675 //* sched_mutex:: @cindex\s-+sched_mutex
3676 //* schedule:: @cindex\s-+schedule
3677 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3678 //* term_mutex:: @cindex\s-+term_mutex
3679 //* thread_ready_cond:: @cindex\s-+thread_ready_cond