1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.118 2002/02/06 01:29:27 sof Exp $
4 * (c) The GHC Team, 1998-2000
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distributed memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
18 * --------------------------------------------------------------------------*/
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
24 * Version with scheduler monitor support for SMPs (WAY=s):
26 This design provides a high-level API to create and schedule threads etc.
27 as documented in the SMP design document.
29 It uses a monitor design controlled by a single mutex to exercise control
30 over accesses to shared data structures, and builds on the Posix threads
33 The majority of state is shared. In order to keep essential per-task state,
34 there is a Capability structure, which contains all the information
35 needed to run a thread: its STG registers, a pointer to its TSO, a
36 nursery etc. During STG execution, a pointer to the capability is
37 kept in a register (BaseReg).
39 In a non-SMP build, there is one global capability, namely MainRegTable.
43 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
45 The main scheduling loop in GUM iterates until a finish message is received.
46 In that case a global flag @receivedFinish@ is set and this instance of
47 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48 for the handling of incoming messages, such as PP_FINISH.
49 Note that in the parallel case we have a system manager that coordinates
50 different PEs, each of which are running one instance of the RTS.
51 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
54 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
56 The main scheduling code in GranSim is quite different from that in std
57 (concurrent) Haskell: while concurrent Haskell just iterates over the
58 threads in the runnable queue, GranSim is event driven, i.e. it iterates
59 over the events in the global event queue. -- HWL
64 //* Variables and Data structures::
65 //* Main scheduling loop::
66 //* Suspend and Resume::
68 //* Garbage Collextion Routines::
69 //* Blocking Queue Routines::
70 //* Exception Handling Routines::
71 //* Debugging Routines::
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
78 #include "PosixSource.h"
85 #include "StgStartup.h"
88 #include "StgMiscClosures.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
124 * These are the threads which clients have requested that we run.
126 * In a 'threaded' build, we might have several concurrent clients all
127 * waiting for results, and each one will wait on a condition variable
128 * until the result is available.
130 * In non-SMP, clients are strictly nested: the first client calls
131 * into the RTS, which might call out again to C with a _ccall_GC, and
132 * eventually re-enter the RTS.
134 * Main threads information is kept in a linked list:
136 //@cindex StgMainThread
137 typedef struct StgMainThread_ {
139 SchedulerStatus stat;
141 #if defined(RTS_SUPPORTS_THREADS)
144 struct StgMainThread_ *link;
147 /* Main thread queue.
148 * Locks required: sched_mutex.
150 static StgMainThread *main_threads;
153 * Locks required: sched_mutex.
157 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
161 In GranSim we have a runable and a blocked queue for each processor.
162 In order to minimise code changes new arrays run_queue_hds/tls
163 are created. run_queue_hd is then a short cut (macro) for
164 run_queue_hds[CurrentProc] (see GranSim.h).
167 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
168 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
169 StgTSO *ccalling_threadss[MAX_PROC];
170 /* We use the same global list of threads (all_threads) in GranSim as in
171 the std RTS (i.e. we are cheating). However, we don't use this list in
172 the GranSim specific code at the moment (so we are only potentially
177 StgTSO *run_queue_hd, *run_queue_tl;
178 StgTSO *blocked_queue_hd, *blocked_queue_tl;
179 StgTSO *sleeping_queue; /* perhaps replace with a hash table? */
183 /* Linked list of all threads.
184 * Used for detecting garbage collected threads.
188 /* Threads suspended in _ccall_GC.
190 static StgTSO *suspended_ccalling_threads;
192 static StgTSO *threadStackOverflow(StgTSO *tso);
194 /* KH: The following two flags are shared memory locations. There is no need
195 to lock them, since they are only unset at the end of a scheduler
199 /* flag set by signal handler to precipitate a context switch */
200 //@cindex context_switch
203 /* if this flag is set as well, give up execution */
204 //@cindex interrupted
207 /* Next thread ID to allocate.
208 * Locks required: sched_mutex
210 //@cindex next_thread_id
211 StgThreadID next_thread_id = 1;
214 * Pointers to the state of the current thread.
215 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
216 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
219 /* The smallest stack size that makes any sense is:
220 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
221 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
222 * + 1 (the realworld token for an IO thread)
223 * + 1 (the closure to enter)
225 * A thread with this stack will bomb immediately with a stack
226 * overflow, which will increase its stack size.
229 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
236 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
237 * exists - earlier gccs apparently didn't.
244 void addToBlockedQueue ( StgTSO *tso );
246 static void schedule ( void );
247 void interruptStgRts ( void );
249 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
251 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
254 static void detectBlackHoles ( void );
257 static void sched_belch(char *s, ...);
260 #if defined(RTS_SUPPORTS_THREADS)
261 /* ToDo: carefully document the invariants that go together
262 * with these synchronisation objects.
264 Mutex sched_mutex = INIT_MUTEX_VAR;
265 Mutex term_mutex = INIT_MUTEX_VAR;
266 #if defined(THREADED_RTS)
268 * The rts_mutex is the 'big lock' that the active native
269 * thread within the RTS holds while executing code.
270 * It is given up when the thread makes a transition out of
271 * the RTS (e.g., to perform an external C call), hopefully
272 * for another thread to take over its chores and enter
276 Mutex rts_mutex = INIT_MUTEX_VAR;
278 * When a native thread has completed executing an external
279 * call, it needs to communicate the result back to the
280 * (Haskell) thread that made the call. Do this as follows:
282 * - in resumeThread(), the thread increments the counter
283 * threads_waiting, and then blocks on the 'big' RTS lock.
284 * - upon entry to the scheduler, the thread that's currently
285 * holding the RTS lock checks threads_waiting. If there
286 * are native threads waiting, it gives up its RTS lock
287 * and tries to re-grab the RTS lock [perhaps after having
288 * waited for a bit..?]
289 * - care must be taken to deal with the case where more than
290 * one external thread are waiting on the lock. [ToDo: more]
294 static nat threads_waiting = 0;
296 * thread_ready_aux_mutex is used to handle the scenario where the
297 * the RTS executing thread runs out of work, but there are
298 * active external threads. The RTS executing thread gives up
299 * its RTS mutex, and blocks waiting for the thread_ready_cond.
300 * Unfortunately, a condition variable needs to be associated
301 * with a mutex in pthreads, so rts_thread_waiting_mutex is
302 * used for just this purpose.
305 Mutex thread_ready_aux_mutex = INIT_MUTEX_VAR;
309 /* thread_ready_cond: when signalled, a thread has
310 * become runnable. When used?
312 Condition thread_ready_cond = INIT_COND_VAR;
313 Condition gc_pending_cond = INIT_COND_VAR;
320 rtsTime TimeOfLastYield;
321 rtsBool emitSchedule = rtsTrue;
325 char *whatNext_strs[] = {
333 char *threadReturnCode_strs[] = {
334 "HeapOverflow", /* might also be StackOverflow */
343 StgTSO * createSparkThread(rtsSpark spark);
344 StgTSO * activateSpark (rtsSpark spark);
348 * The thread state for the main thread.
349 // ToDo: check whether not needed any more
353 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
354 static void taskStart(void);
358 /* threads start up using 'taskStart', so make them
359 them grab the RTS lock. */
360 #if defined(THREADED_RTS)
361 ACQUIRE_LOCK(&rts_mutex);
371 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
372 //@subsection Main scheduling loop
374 /* ---------------------------------------------------------------------------
375 Main scheduling loop.
377 We use round-robin scheduling, each thread returning to the
378 scheduler loop when one of these conditions is detected:
381 * timer expires (thread yields)
386 Locking notes: we acquire the scheduler lock once at the beginning
387 of the scheduler loop, and release it when
389 * running a thread, or
390 * waiting for work, or
391 * waiting for a GC to complete.
394 In a GranSim setup this loop iterates over the global event queue.
395 This revolves around the global event queue, which determines what
396 to do next. Therefore, it's more complicated than either the
397 concurrent or the parallel (GUM) setup.
400 GUM iterates over incoming messages.
401 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
402 and sends out a fish whenever it has nothing to do; in-between
403 doing the actual reductions (shared code below) it processes the
404 incoming messages and deals with delayed operations
405 (see PendingFetches).
406 This is not the ugliest code you could imagine, but it's bloody close.
408 ------------------------------------------------------------------------ */
415 StgThreadReturnCode ret;
423 rtsBool receivedFinish = rtsFalse;
425 nat tp_size, sp_size; // stats only
428 rtsBool was_interrupted = rtsFalse;
430 ACQUIRE_LOCK(&sched_mutex);
432 #if defined(THREADED_RTS)
433 /* ToDo: consider SMP support */
434 if (threads_waiting > 0) {
435 /* (At least) one native thread is waiting to
436 * deposit the result of an external call. So,
437 * give up our RTS executing privileges and let
438 * one of them continue.
442 RELEASE_LOCK(&sched_mutex);
443 IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (threads_waiting=%d)\n", osThreadId(), threads_waiting));
444 RELEASE_LOCK(&rts_mutex);
445 /* ToDo: come up with mechanism that guarantees that
446 * the main thread doesn't loop here.
449 /* ToDo: longjmp() */
456 /* set up first event to get things going */
457 /* ToDo: assign costs for system setup and init MainTSO ! */
458 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
460 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
463 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
464 G_TSO(CurrentTSO, 5));
466 if (RtsFlags.GranFlags.Light) {
467 /* Save current time; GranSim Light only */
468 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
471 event = get_next_event();
473 while (event!=(rtsEvent*)NULL) {
474 /* Choose the processor with the next event */
475 CurrentProc = event->proc;
476 CurrentTSO = event->tso;
480 while (!receivedFinish) { /* set by processMessages */
481 /* when receiving PP_FINISH message */
488 IF_DEBUG(scheduler, printAllThreads());
490 /* If we're interrupted (the user pressed ^C, or some other
491 * termination condition occurred), kill all the currently running
495 IF_DEBUG(scheduler, sched_belch("interrupted"));
497 interrupted = rtsFalse;
498 was_interrupted = rtsTrue;
501 /* Go through the list of main threads and wake up any
502 * clients whose computations have finished. ToDo: this
503 * should be done more efficiently without a linear scan
504 * of the main threads list, somehow...
506 #if defined(RTS_SUPPORTS_THREADS)
508 StgMainThread *m, **prev;
509 prev = &main_threads;
510 for (m = main_threads; m != NULL; m = m->link) {
511 switch (m->tso->what_next) {
514 *(m->ret) = (StgClosure *)m->tso->sp[0];
518 broadcastCondition(&m->wakeup);
521 if (m->ret) *(m->ret) = NULL;
523 if (was_interrupted) {
524 m->stat = Interrupted;
528 broadcastCondition(&m->wakeup);
536 #else /* not threaded */
539 /* in GUM do this only on the Main PE */
542 /* If our main thread has finished or been killed, return.
545 StgMainThread *m = main_threads;
546 if (m->tso->what_next == ThreadComplete
547 || m->tso->what_next == ThreadKilled) {
548 main_threads = main_threads->link;
549 if (m->tso->what_next == ThreadComplete) {
550 /* we finished successfully, fill in the return value */
551 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
555 if (m->ret) { *(m->ret) = NULL; };
556 if (was_interrupted) {
557 m->stat = Interrupted;
567 /* Top up the run queue from our spark pool. We try to make the
568 * number of threads in the run queue equal to the number of
571 * Disable spark support in SMP for now, non-essential & requires
572 * a little bit of work to make it compile cleanly. -- sof 1/02.
574 #if 0 /* defined(SMP) */
576 nat n = getFreeCapabilities();
577 StgTSO *tso = run_queue_hd;
579 /* Count the run queue */
580 while (n > 0 && tso != END_TSO_QUEUE) {
587 spark = findSpark(rtsFalse);
589 break; /* no more sparks in the pool */
591 /* I'd prefer this to be done in activateSpark -- HWL */
592 /* tricky - it needs to hold the scheduler lock and
593 * not try to re-acquire it -- SDM */
594 createSparkThread(spark);
596 sched_belch("==^^ turning spark of closure %p into a thread",
597 (StgClosure *)spark));
600 /* We need to wake up the other tasks if we just created some
603 if (getFreeCapabilities() - n > 1) {
604 signalCondition( &thread_ready_cond );
609 /* check for signals each time around the scheduler */
610 #ifndef mingw32_TARGET_OS
611 if (signals_pending()) {
612 startSignalHandlers();
616 /* Check whether any waiting threads need to be woken up. If the
617 * run queue is empty, and there are no other tasks running, we
618 * can wait indefinitely for something to happen.
619 * ToDo: what if another client comes along & requests another
622 if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
624 (run_queue_hd == END_TSO_QUEUE)
626 && allFreeCapabilities()
630 /* we can be interrupted while waiting for I/O... */
631 if (interrupted) continue;
634 * Detect deadlock: when we have no threads to run, there are no
635 * threads waiting on I/O or sleeping, and all the other tasks are
636 * waiting for work, we must have a deadlock of some description.
638 * We first try to find threads blocked on themselves (ie. black
639 * holes), and generate NonTermination exceptions where necessary.
641 * If no threads are black holed, we have a deadlock situation, so
642 * inform all the main threads.
645 if (blocked_queue_hd == END_TSO_QUEUE
646 && run_queue_hd == END_TSO_QUEUE
647 && sleeping_queue == END_TSO_QUEUE
649 && allFreeCapabilities()
650 #elif defined(THREADED_RTS)
651 && suspended_ccalling_threads == END_TSO_QUEUE
655 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
656 RELEASE_LOCK(&sched_mutex);
657 GarbageCollect(GetRoots,rtsTrue);
658 ACQUIRE_LOCK(&sched_mutex);
659 IF_DEBUG(scheduler, sched_belch("GC done."));
660 if (blocked_queue_hd == END_TSO_QUEUE
661 && run_queue_hd == END_TSO_QUEUE
662 && sleeping_queue == END_TSO_QUEUE) {
664 IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
667 /* No black holes, so probably a real deadlock. Send the
668 * current main thread the Deadlock exception (or in the SMP
669 * build, send *all* main threads the deadlock exception,
670 * since none of them can make progress).
672 if (run_queue_hd == END_TSO_QUEUE) {
674 #if defined(RTS_SUPPORTS_THREADS)
675 for (m = main_threads; m != NULL; m = m->link) {
676 switch (m->tso->why_blocked) {
677 case BlockedOnBlackHole:
678 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
680 case BlockedOnException:
682 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
685 barf("deadlock: main thread blocked in a strange way");
690 switch (m->tso->why_blocked) {
691 case BlockedOnBlackHole:
692 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
694 case BlockedOnException:
696 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
699 barf("deadlock: main thread blocked in a strange way");
703 #if defined(RTS_SUPPORTS_THREADS)
704 if ( run_queue_hd == END_TSO_QUEUE ) {
705 IF_DEBUG(scheduler, sched_belch("all done, it seems...shut down."));
706 shutdownHaskellAndExit(0);
710 ASSERT( run_queue_hd != END_TSO_QUEUE );
714 /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
718 /* If there's a GC pending, don't do anything until it has
722 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
723 waitCondition( &gc_pending_cond, &sched_mutex );
728 /* block until we've got a thread on the run queue and a free
731 while ( run_queue_hd == END_TSO_QUEUE
732 || noFreeCapabilities()
734 IF_DEBUG(scheduler, sched_belch("waiting for work"));
735 waitCondition( &thread_ready_cond, &sched_mutex );
736 IF_DEBUG(scheduler, sched_belch("work now available"));
738 #elif defined(THREADED_RTS)
739 if ( run_queue_hd == END_TSO_QUEUE ) {
740 /* no work available, wait for external calls to complete. */
741 IF_DEBUG(scheduler, sched_belch("worker thread (%d): waiting for external thread to complete..", osThreadId()));
743 RELEASE_LOCK(&sched_mutex);
744 RELEASE_LOCK(&rts_mutex);
746 /* Sigh - need to have a mutex locked in order to wait on the
747 condition variable. */
748 ACQUIRE_LOCK(&thread_ready_aux_mutex);
749 waitCondition(&thread_ready_cond, &thread_ready_aux_mutex);
750 RELEASE_LOCK(&thread_ready_aux_mutex);
752 IF_DEBUG(scheduler, sched_belch("worker thread (%d): re-awakened from no-work slumber..\n", osThreadId()));
753 /* ToDo: longjmp() */
760 if (RtsFlags.GranFlags.Light)
761 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
763 /* adjust time based on time-stamp */
764 if (event->time > CurrentTime[CurrentProc] &&
765 event->evttype != ContinueThread)
766 CurrentTime[CurrentProc] = event->time;
768 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
769 if (!RtsFlags.GranFlags.Light)
772 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
774 /* main event dispatcher in GranSim */
775 switch (event->evttype) {
776 /* Should just be continuing execution */
778 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
779 /* ToDo: check assertion
780 ASSERT(run_queue_hd != (StgTSO*)NULL &&
781 run_queue_hd != END_TSO_QUEUE);
783 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
784 if (!RtsFlags.GranFlags.DoAsyncFetch &&
785 procStatus[CurrentProc]==Fetching) {
786 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
787 CurrentTSO->id, CurrentTSO, CurrentProc);
790 /* Ignore ContinueThreads for completed threads */
791 if (CurrentTSO->what_next == ThreadComplete) {
792 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
793 CurrentTSO->id, CurrentTSO, CurrentProc);
796 /* Ignore ContinueThreads for threads that are being migrated */
797 if (PROCS(CurrentTSO)==Nowhere) {
798 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
799 CurrentTSO->id, CurrentTSO, CurrentProc);
802 /* The thread should be at the beginning of the run queue */
803 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
804 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
805 CurrentTSO->id, CurrentTSO, CurrentProc);
806 break; // run the thread anyway
809 new_event(proc, proc, CurrentTime[proc],
811 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
813 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
814 break; // now actually run the thread; DaH Qu'vam yImuHbej
817 do_the_fetchnode(event);
818 goto next_thread; /* handle next event in event queue */
821 do_the_globalblock(event);
822 goto next_thread; /* handle next event in event queue */
825 do_the_fetchreply(event);
826 goto next_thread; /* handle next event in event queue */
828 case UnblockThread: /* Move from the blocked queue to the tail of */
829 do_the_unblock(event);
830 goto next_thread; /* handle next event in event queue */
832 case ResumeThread: /* Move from the blocked queue to the tail of */
833 /* the runnable queue ( i.e. Qu' SImqa'lu') */
834 event->tso->gran.blocktime +=
835 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
836 do_the_startthread(event);
837 goto next_thread; /* handle next event in event queue */
840 do_the_startthread(event);
841 goto next_thread; /* handle next event in event queue */
844 do_the_movethread(event);
845 goto next_thread; /* handle next event in event queue */
848 do_the_movespark(event);
849 goto next_thread; /* handle next event in event queue */
852 do_the_findwork(event);
853 goto next_thread; /* handle next event in event queue */
856 barf("Illegal event type %u\n", event->evttype);
859 /* This point was scheduler_loop in the old RTS */
861 IF_DEBUG(gran, belch("GRAN: after main switch"));
863 TimeOfLastEvent = CurrentTime[CurrentProc];
864 TimeOfNextEvent = get_time_of_next_event();
865 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
866 // CurrentTSO = ThreadQueueHd;
868 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
871 if (RtsFlags.GranFlags.Light)
872 GranSimLight_leave_system(event, &ActiveTSO);
874 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
877 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
879 /* in a GranSim setup the TSO stays on the run queue */
881 /* Take a thread from the run queue. */
882 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
885 fprintf(stderr, "GRAN: About to run current thread, which is\n");
888 context_switch = 0; // turned on via GranYield, checking events and time slice
891 DumpGranEvent(GR_SCHEDULE, t));
893 procStatus[CurrentProc] = Busy;
896 if (PendingFetches != END_BF_QUEUE) {
900 /* ToDo: phps merge with spark activation above */
901 /* check whether we have local work and send requests if we have none */
902 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
903 /* :-[ no local threads => look out for local sparks */
904 /* the spark pool for the current PE */
905 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
906 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
907 pool->hd < pool->tl) {
909 * ToDo: add GC code check that we really have enough heap afterwards!!
911 * If we're here (no runnable threads) and we have pending
912 * sparks, we must have a space problem. Get enough space
913 * to turn one of those pending sparks into a
917 spark = findSpark(rtsFalse); /* get a spark */
918 if (spark != (rtsSpark) NULL) {
919 tso = activateSpark(spark); /* turn the spark into a thread */
920 IF_PAR_DEBUG(schedule,
921 belch("==== schedule: Created TSO %d (%p); %d threads active",
922 tso->id, tso, advisory_thread_count));
924 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
925 belch("==^^ failed to activate spark");
927 } /* otherwise fall through & pick-up new tso */
929 IF_PAR_DEBUG(verbose,
930 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
931 spark_queue_len(pool)));
936 /* If we still have no work we need to send a FISH to get a spark
939 if (EMPTY_RUN_QUEUE()) {
940 /* =8-[ no local sparks => look for work on other PEs */
942 * We really have absolutely no work. Send out a fish
943 * (there may be some out there already), and wait for
944 * something to arrive. We clearly can't run any threads
945 * until a SCHEDULE or RESUME arrives, and so that's what
946 * we're hoping to see. (Of course, we still have to
947 * respond to other types of messages.)
949 TIME now = msTime() /*CURRENT_TIME*/;
950 IF_PAR_DEBUG(verbose,
951 belch("-- now=%ld", now));
952 IF_PAR_DEBUG(verbose,
953 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
954 (last_fish_arrived_at!=0 &&
955 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
956 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
957 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
958 last_fish_arrived_at,
959 RtsFlags.ParFlags.fishDelay, now);
962 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
963 (last_fish_arrived_at==0 ||
964 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
965 /* outstandingFishes is set in sendFish, processFish;
966 avoid flooding system with fishes via delay */
968 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
971 // Global statistics: count no. of fishes
972 if (RtsFlags.ParFlags.ParStats.Global &&
973 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
974 globalParStats.tot_fish_mess++;
978 receivedFinish = processMessages();
981 } else if (PacketsWaiting()) { /* Look for incoming messages */
982 receivedFinish = processMessages();
985 /* Now we are sure that we have some work available */
986 ASSERT(run_queue_hd != END_TSO_QUEUE);
988 /* Take a thread from the run queue, if we have work */
989 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
990 IF_DEBUG(sanity,checkTSO(t));
992 /* ToDo: write something to the log-file
993 if (RTSflags.ParFlags.granSimStats && !sameThread)
994 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
998 /* the spark pool for the current PE */
999 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1002 belch("--=^ %d threads, %d sparks on [%#x]",
1003 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1006 if (0 && RtsFlags.ParFlags.ParStats.Full &&
1007 t && LastTSO && t->id != LastTSO->id &&
1008 LastTSO->why_blocked == NotBlocked &&
1009 LastTSO->what_next != ThreadComplete) {
1010 // if previously scheduled TSO not blocked we have to record the context switch
1011 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1012 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1015 if (RtsFlags.ParFlags.ParStats.Full &&
1016 (emitSchedule /* forced emit */ ||
1017 (t && LastTSO && t->id != LastTSO->id))) {
1019 we are running a different TSO, so write a schedule event to log file
1020 NB: If we use fair scheduling we also have to write a deschedule
1021 event for LastTSO; with unfair scheduling we know that the
1022 previous tso has blocked whenever we switch to another tso, so
1023 we don't need it in GUM for now
1025 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1026 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1027 emitSchedule = rtsFalse;
1031 #else /* !GRAN && !PAR */
1033 /* grab a thread from the run queue
1035 ASSERT(run_queue_hd != END_TSO_QUEUE);
1036 t = POP_RUN_QUEUE();
1037 // Sanity check the thread we're about to run. This can be
1038 // expensive if there is lots of thread switching going on...
1039 IF_DEBUG(sanity,checkTSO(t));
1042 grabCapability(&cap);
1043 cap->r.rCurrentTSO = t;
1045 /* context switches are now initiated by the timer signal, unless
1046 * the user specified "context switch as often as possible", with
1051 RtsFlags.ProfFlags.profileInterval == 0 ||
1053 (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1054 && (run_queue_hd != END_TSO_QUEUE
1055 || blocked_queue_hd != END_TSO_QUEUE
1056 || sleeping_queue != END_TSO_QUEUE)))
1061 RELEASE_LOCK(&sched_mutex);
1063 IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
1064 t->id, t, whatNext_strs[t->what_next]));
1067 startHeapProfTimer();
1070 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1071 /* Run the current thread
1073 switch (cap->r.rCurrentTSO->what_next) {
1075 case ThreadComplete:
1076 /* Thread already finished, return to scheduler. */
1077 ret = ThreadFinished;
1079 case ThreadEnterGHC:
1080 ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1083 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1085 case ThreadEnterInterp:
1086 ret = interpretBCO(cap);
1089 barf("schedule: invalid what_next field");
1091 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1093 /* Costs for the scheduler are assigned to CCS_SYSTEM */
1095 stopHeapProfTimer();
1099 ACQUIRE_LOCK(&sched_mutex);
1102 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1103 #elif !defined(GRAN) && !defined(PAR)
1104 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1106 t = cap->r.rCurrentTSO;
1109 /* HACK 675: if the last thread didn't yield, make sure to print a
1110 SCHEDULE event to the log file when StgRunning the next thread, even
1111 if it is the same one as before */
1113 TimeOfLastYield = CURRENT_TIME;
1119 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1120 globalGranStats.tot_heapover++;
1122 globalParStats.tot_heapover++;
1125 // did the task ask for a large block?
1126 if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1127 // if so, get one and push it on the front of the nursery.
1131 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1133 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)",
1135 whatNext_strs[t->what_next], blocks));
1137 // don't do this if it would push us over the
1138 // alloc_blocks_lim limit; we'll GC first.
1139 if (alloc_blocks + blocks < alloc_blocks_lim) {
1141 alloc_blocks += blocks;
1142 bd = allocGroup( blocks );
1144 // link the new group into the list
1145 bd->link = cap->r.rCurrentNursery;
1146 bd->u.back = cap->r.rCurrentNursery->u.back;
1147 if (cap->r.rCurrentNursery->u.back != NULL) {
1148 cap->r.rCurrentNursery->u.back->link = bd;
1150 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1151 g0s0->blocks == cap->r.rNursery);
1152 cap->r.rNursery = g0s0->blocks = bd;
1154 cap->r.rCurrentNursery->u.back = bd;
1156 // initialise it as a nursery block
1160 bd->free = bd->start;
1162 // don't forget to update the block count in g0s0.
1163 g0s0->n_blocks += blocks;
1164 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1166 // now update the nursery to point to the new block
1167 cap->r.rCurrentNursery = bd;
1169 // we might be unlucky and have another thread get on the
1170 // run queue before us and steal the large block, but in that
1171 // case the thread will just end up requesting another large
1173 PUSH_ON_RUN_QUEUE(t);
1178 /* make all the running tasks block on a condition variable,
1179 * maybe set context_switch and wait till they all pile in,
1180 * then have them wait on a GC condition variable.
1182 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
1183 t->id, t, whatNext_strs[t->what_next]));
1186 ASSERT(!is_on_queue(t,CurrentProc));
1188 /* Currently we emit a DESCHEDULE event before GC in GUM.
1189 ToDo: either add separate event to distinguish SYSTEM time from rest
1190 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1191 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1192 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1193 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1194 emitSchedule = rtsTrue;
1198 ready_to_gc = rtsTrue;
1199 context_switch = 1; /* stop other threads ASAP */
1200 PUSH_ON_RUN_QUEUE(t);
1201 /* actual GC is done at the end of the while loop */
1207 DumpGranEvent(GR_DESCHEDULE, t));
1208 globalGranStats.tot_stackover++;
1211 // DumpGranEvent(GR_DESCHEDULE, t);
1212 globalParStats.tot_stackover++;
1214 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
1215 t->id, t, whatNext_strs[t->what_next]));
1216 /* just adjust the stack for this thread, then pop it back
1222 /* enlarge the stack */
1223 StgTSO *new_t = threadStackOverflow(t);
1225 /* This TSO has moved, so update any pointers to it from the
1226 * main thread stack. It better not be on any other queues...
1227 * (it shouldn't be).
1229 for (m = main_threads; m != NULL; m = m->link) {
1234 threadPaused(new_t);
1235 PUSH_ON_RUN_QUEUE(new_t);
1239 case ThreadYielding:
1242 DumpGranEvent(GR_DESCHEDULE, t));
1243 globalGranStats.tot_yields++;
1246 // DumpGranEvent(GR_DESCHEDULE, t);
1247 globalParStats.tot_yields++;
1249 /* put the thread back on the run queue. Then, if we're ready to
1250 * GC, check whether this is the last task to stop. If so, wake
1251 * up the GC thread. getThread will block during a GC until the
1255 if (t->what_next == ThreadEnterInterp) {
1256 /* ToDo: or maybe a timer expired when we were in Hugs?
1257 * or maybe someone hit ctrl-C
1259 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
1260 t->id, t, whatNext_strs[t->what_next]);
1262 belch("--<< thread %ld (%p; %s) stopped, yielding",
1263 t->id, t, whatNext_strs[t->what_next]);
1270 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1272 ASSERT(t->link == END_TSO_QUEUE);
1274 ASSERT(!is_on_queue(t,CurrentProc));
1277 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1278 checkThreadQsSanity(rtsTrue));
1281 if (RtsFlags.ParFlags.doFairScheduling) {
1282 /* this does round-robin scheduling; good for concurrency */
1283 APPEND_TO_RUN_QUEUE(t);
1285 /* this does unfair scheduling; good for parallelism */
1286 PUSH_ON_RUN_QUEUE(t);
1289 /* this does round-robin scheduling; good for concurrency */
1290 APPEND_TO_RUN_QUEUE(t);
1293 /* add a ContinueThread event to actually process the thread */
1294 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1296 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1298 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1307 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1308 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1309 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1311 // ??? needed; should emit block before
1313 DumpGranEvent(GR_DESCHEDULE, t));
1314 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1317 ASSERT(procStatus[CurrentProc]==Busy ||
1318 ((procStatus[CurrentProc]==Fetching) &&
1319 (t->block_info.closure!=(StgClosure*)NULL)));
1320 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1321 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1322 procStatus[CurrentProc]==Fetching))
1323 procStatus[CurrentProc] = Idle;
1327 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1328 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1331 if (t->block_info.closure!=(StgClosure*)NULL)
1332 print_bq(t->block_info.closure));
1334 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1337 /* whatever we schedule next, we must log that schedule */
1338 emitSchedule = rtsTrue;
1341 /* don't need to do anything. Either the thread is blocked on
1342 * I/O, in which case we'll have called addToBlockedQueue
1343 * previously, or it's blocked on an MVar or Blackhole, in which
1344 * case it'll be on the relevant queue already.
1347 fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1348 printThreadBlockage(t);
1349 fprintf(stderr, "\n"));
1351 /* Only for dumping event to log file
1352 ToDo: do I need this in GranSim, too?
1359 case ThreadFinished:
1360 /* Need to check whether this was a main thread, and if so, signal
1361 * the task that started it with the return value. If we have no
1362 * more main threads, we probably need to stop all the tasks until
1365 /* We also end up here if the thread kills itself with an
1366 * uncaught exception, see Exception.hc.
1368 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1370 endThread(t, CurrentProc); // clean-up the thread
1372 /* For now all are advisory -- HWL */
1373 //if(t->priority==AdvisoryPriority) ??
1374 advisory_thread_count--;
1377 if(t->dist.priority==RevalPriority)
1381 if (RtsFlags.ParFlags.ParStats.Full &&
1382 !RtsFlags.ParFlags.ParStats.Suppressed)
1383 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1388 barf("schedule: invalid thread return code %d", (int)ret);
1392 grabCapability(&cap);
1396 if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1397 GarbageCollect(GetRoots, rtsTrue);
1399 performHeapProfile = rtsFalse;
1400 ready_to_gc = rtsFalse; // we already GC'd
1405 if (ready_to_gc && allFreeCapabilities() )
1410 /* everybody back, start the GC.
1411 * Could do it in this thread, or signal a condition var
1412 * to do it in another thread. Either way, we need to
1413 * broadcast on gc_pending_cond afterward.
1415 #if defined(RTS_SUPPORTS_THREADS)
1416 IF_DEBUG(scheduler,sched_belch("doing GC"));
1418 GarbageCollect(GetRoots,rtsFalse);
1419 ready_to_gc = rtsFalse;
1421 broadcastCondition(&gc_pending_cond);
1424 /* add a ContinueThread event to continue execution of current thread */
1425 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1427 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1429 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1437 IF_GRAN_DEBUG(unused,
1438 print_eventq(EventHd));
1440 event = get_next_event();
1443 /* ToDo: wait for next message to arrive rather than busy wait */
1446 } /* end of while(1) */
1448 IF_PAR_DEBUG(verbose,
1449 belch("== Leaving schedule() after having received Finish"));
1452 /* ---------------------------------------------------------------------------
1453 * deleteAllThreads(): kill all the live threads.
1455 * This is used when we catch a user interrupt (^C), before performing
1456 * any necessary cleanups and running finalizers.
1457 * ------------------------------------------------------------------------- */
1459 void deleteAllThreads ( void )
1462 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1463 for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) {
1467 for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) {
1471 for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) {
1475 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1476 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1477 sleeping_queue = END_TSO_QUEUE;
1480 /* startThread and insertThread are now in GranSim.c -- HWL */
1483 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1484 //@subsection Suspend and Resume
1486 /* ---------------------------------------------------------------------------
1487 * Suspending & resuming Haskell threads.
1489 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1490 * its capability before calling the C function. This allows another
1491 * task to pick up the capability and carry on running Haskell
1492 * threads. It also means that if the C call blocks, it won't lock
1495 * The Haskell thread making the C call is put to sleep for the
1496 * duration of the call, on the susepended_ccalling_threads queue. We
1497 * give out a token to the task, which it can use to resume the thread
1498 * on return from the C function.
1499 * ------------------------------------------------------------------------- */
1502 suspendThread( StgRegTable *reg )
1507 /* assume that *reg is a pointer to the StgRegTable part
1510 cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1512 ACQUIRE_LOCK(&sched_mutex);
1515 sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1517 threadPaused(cap->r.rCurrentTSO);
1518 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1519 suspended_ccalling_threads = cap->r.rCurrentTSO;
1521 /* Use the thread ID as the token; it should be unique */
1522 tok = cap->r.rCurrentTSO->id;
1524 /* Hand back capability */
1525 releaseCapability(&cap);
1527 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
1528 /* Preparing to leave the RTS, so ensure there's a native thread/task
1529 waiting to take over.
1531 ToDo: optimise this and only create a new task if there's a need
1532 for one (i.e., if there's only one Concurrent Haskell thread alive,
1533 there's no need to create a new task).
1535 IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok));
1536 startTask(taskStart);
1540 RELEASE_LOCK(&sched_mutex);
1541 RELEASE_LOCK(&rts_mutex);
1546 resumeThread( StgInt tok )
1548 StgTSO *tso, **prev;
1551 #if defined(THREADED_RTS)
1552 IF_DEBUG(scheduler, sched_belch("thread %d returning, waiting for sched. lock.\n", tok));
1553 ACQUIRE_LOCK(&sched_mutex);
1555 IF_DEBUG(scheduler, sched_belch("thread %d returning, threads waiting: %d.\n", tok, threads_waiting));
1556 RELEASE_LOCK(&sched_mutex);
1558 IF_DEBUG(scheduler, sched_belch("thread %d waiting for RTS lock...\n", tok));
1559 ACQUIRE_LOCK(&rts_mutex);
1562 IF_DEBUG(scheduler, sched_belch("thread %d acquired RTS lock...\n", tok));
1565 #if defined(THREADED_RTS)
1566 /* Free up any RTS-blocked threads. */
1567 broadcastCondition(&thread_ready_cond);
1570 /* Remove the thread off of the suspended list */
1571 prev = &suspended_ccalling_threads;
1572 for (tso = suspended_ccalling_threads;
1573 tso != END_TSO_QUEUE;
1574 prev = &tso->link, tso = tso->link) {
1575 if (tso->id == (StgThreadID)tok) {
1580 if (tso == END_TSO_QUEUE) {
1581 barf("resumeThread: thread not found");
1583 tso->link = END_TSO_QUEUE;
1586 while ( noFreeCapabilities() ) {
1587 IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1588 waitCondition(&thread_ready_cond, &sched_mutex);
1589 IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1593 grabCapability(&cap);
1595 cap->r.rCurrentTSO = tso;
1601 /* ---------------------------------------------------------------------------
1603 * ------------------------------------------------------------------------ */
1604 static void unblockThread(StgTSO *tso);
1606 /* ---------------------------------------------------------------------------
1607 * Comparing Thread ids.
1609 * This is used from STG land in the implementation of the
1610 * instances of Eq/Ord for ThreadIds.
1611 * ------------------------------------------------------------------------ */
1613 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1615 StgThreadID id1 = tso1->id;
1616 StgThreadID id2 = tso2->id;
1618 if (id1 < id2) return (-1);
1619 if (id1 > id2) return 1;
1623 /* ---------------------------------------------------------------------------
1624 * Fetching the ThreadID from an StgTSO.
1626 * This is used in the implementation of Show for ThreadIds.
1627 * ------------------------------------------------------------------------ */
1628 int rts_getThreadId(const StgTSO *tso)
1633 /* ---------------------------------------------------------------------------
1634 Create a new thread.
1636 The new thread starts with the given stack size. Before the
1637 scheduler can run, however, this thread needs to have a closure
1638 (and possibly some arguments) pushed on its stack. See
1639 pushClosure() in Schedule.h.
1641 createGenThread() and createIOThread() (in SchedAPI.h) are
1642 convenient packaged versions of this function.
1644 currently pri (priority) is only used in a GRAN setup -- HWL
1645 ------------------------------------------------------------------------ */
1646 //@cindex createThread
1648 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1650 createThread(nat stack_size, StgInt pri)
1652 return createThread_(stack_size, rtsFalse, pri);
1656 createThread_(nat size, rtsBool have_lock, StgInt pri)
1660 createThread(nat stack_size)
1662 return createThread_(stack_size, rtsFalse);
1666 createThread_(nat size, rtsBool have_lock)
1673 /* First check whether we should create a thread at all */
1675 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1676 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1678 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1679 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1680 return END_TSO_QUEUE;
1686 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1689 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1691 /* catch ridiculously small stack sizes */
1692 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1693 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1696 stack_size = size - TSO_STRUCT_SIZEW;
1698 tso = (StgTSO *)allocate(size);
1699 TICK_ALLOC_TSO(stack_size, 0);
1701 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1703 SET_GRAN_HDR(tso, ThisPE);
1705 tso->what_next = ThreadEnterGHC;
1707 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1708 * protect the increment operation on next_thread_id.
1709 * In future, we could use an atomic increment instead.
1711 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1712 tso->id = next_thread_id++;
1713 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1715 tso->why_blocked = NotBlocked;
1716 tso->blocked_exceptions = NULL;
1718 tso->stack_size = stack_size;
1719 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1721 tso->sp = (P_)&(tso->stack) + stack_size;
1724 tso->prof.CCCS = CCS_MAIN;
1727 /* put a stop frame on the stack */
1728 tso->sp -= sizeofW(StgStopFrame);
1729 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1730 tso->su = (StgUpdateFrame*)tso->sp;
1734 tso->link = END_TSO_QUEUE;
1735 /* uses more flexible routine in GranSim */
1736 insertThread(tso, CurrentProc);
1738 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1744 if (RtsFlags.GranFlags.GranSimStats.Full)
1745 DumpGranEvent(GR_START,tso);
1747 if (RtsFlags.ParFlags.ParStats.Full)
1748 DumpGranEvent(GR_STARTQ,tso);
1749 /* HACk to avoid SCHEDULE
1753 /* Link the new thread on the global thread list.
1755 tso->global_link = all_threads;
1759 tso->dist.priority = MandatoryPriority; //by default that is...
1763 tso->gran.pri = pri;
1765 tso->gran.magic = TSO_MAGIC; // debugging only
1767 tso->gran.sparkname = 0;
1768 tso->gran.startedat = CURRENT_TIME;
1769 tso->gran.exported = 0;
1770 tso->gran.basicblocks = 0;
1771 tso->gran.allocs = 0;
1772 tso->gran.exectime = 0;
1773 tso->gran.fetchtime = 0;
1774 tso->gran.fetchcount = 0;
1775 tso->gran.blocktime = 0;
1776 tso->gran.blockcount = 0;
1777 tso->gran.blockedat = 0;
1778 tso->gran.globalsparks = 0;
1779 tso->gran.localsparks = 0;
1780 if (RtsFlags.GranFlags.Light)
1781 tso->gran.clock = Now; /* local clock */
1783 tso->gran.clock = 0;
1785 IF_DEBUG(gran,printTSO(tso));
1788 tso->par.magic = TSO_MAGIC; // debugging only
1790 tso->par.sparkname = 0;
1791 tso->par.startedat = CURRENT_TIME;
1792 tso->par.exported = 0;
1793 tso->par.basicblocks = 0;
1794 tso->par.allocs = 0;
1795 tso->par.exectime = 0;
1796 tso->par.fetchtime = 0;
1797 tso->par.fetchcount = 0;
1798 tso->par.blocktime = 0;
1799 tso->par.blockcount = 0;
1800 tso->par.blockedat = 0;
1801 tso->par.globalsparks = 0;
1802 tso->par.localsparks = 0;
1806 globalGranStats.tot_threads_created++;
1807 globalGranStats.threads_created_on_PE[CurrentProc]++;
1808 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1809 globalGranStats.tot_sq_probes++;
1811 // collect parallel global statistics (currently done together with GC stats)
1812 if (RtsFlags.ParFlags.ParStats.Global &&
1813 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1814 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
1815 globalParStats.tot_threads_created++;
1821 belch("==__ schedule: Created TSO %d (%p);",
1822 CurrentProc, tso, tso->id));
1824 IF_PAR_DEBUG(verbose,
1825 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1826 tso->id, tso, advisory_thread_count));
1828 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1829 tso->id, tso->stack_size));
1836 all parallel thread creation calls should fall through the following routine.
1839 createSparkThread(rtsSpark spark)
1841 ASSERT(spark != (rtsSpark)NULL);
1842 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1844 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1845 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1846 return END_TSO_QUEUE;
1850 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1851 if (tso==END_TSO_QUEUE)
1852 barf("createSparkThread: Cannot create TSO");
1854 tso->priority = AdvisoryPriority;
1856 pushClosure(tso,spark);
1857 PUSH_ON_RUN_QUEUE(tso);
1858 advisory_thread_count++;
1865 Turn a spark into a thread.
1866 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1869 //@cindex activateSpark
1871 activateSpark (rtsSpark spark)
1875 tso = createSparkThread(spark);
1876 if (RtsFlags.ParFlags.ParStats.Full) {
1877 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1878 IF_PAR_DEBUG(verbose,
1879 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1880 (StgClosure *)spark, info_type((StgClosure *)spark)));
1882 // ToDo: fwd info on local/global spark to thread -- HWL
1883 // tso->gran.exported = spark->exported;
1884 // tso->gran.locked = !spark->global;
1885 // tso->gran.sparkname = spark->name;
1891 /* ---------------------------------------------------------------------------
1894 * scheduleThread puts a thread on the head of the runnable queue.
1895 * This will usually be done immediately after a thread is created.
1896 * The caller of scheduleThread must create the thread using e.g.
1897 * createThread and push an appropriate closure
1898 * on this thread's stack before the scheduler is invoked.
1899 * ------------------------------------------------------------------------ */
1902 scheduleThread(StgTSO *tso)
1904 ACQUIRE_LOCK(&sched_mutex);
1906 /* Put the new thread on the head of the runnable queue. The caller
1907 * better push an appropriate closure on this thread's stack
1908 * beforehand. In the SMP case, the thread may start running as
1909 * soon as we release the scheduler lock below.
1911 PUSH_ON_RUN_QUEUE(tso);
1915 IF_DEBUG(scheduler,printTSO(tso));
1917 RELEASE_LOCK(&sched_mutex);
1920 /* ---------------------------------------------------------------------------
1923 * Initialise the scheduler. This resets all the queues - if the
1924 * queues contained any threads, they'll be garbage collected at the
1927 * ------------------------------------------------------------------------ */
1931 term_handler(int sig STG_UNUSED)
1934 ACQUIRE_LOCK(&term_mutex);
1936 RELEASE_LOCK(&term_mutex);
1947 for (i=0; i<=MAX_PROC; i++) {
1948 run_queue_hds[i] = END_TSO_QUEUE;
1949 run_queue_tls[i] = END_TSO_QUEUE;
1950 blocked_queue_hds[i] = END_TSO_QUEUE;
1951 blocked_queue_tls[i] = END_TSO_QUEUE;
1952 ccalling_threadss[i] = END_TSO_QUEUE;
1953 sleeping_queue = END_TSO_QUEUE;
1956 run_queue_hd = END_TSO_QUEUE;
1957 run_queue_tl = END_TSO_QUEUE;
1958 blocked_queue_hd = END_TSO_QUEUE;
1959 blocked_queue_tl = END_TSO_QUEUE;
1960 sleeping_queue = END_TSO_QUEUE;
1963 suspended_ccalling_threads = END_TSO_QUEUE;
1965 main_threads = NULL;
1966 all_threads = END_TSO_QUEUE;
1971 RtsFlags.ConcFlags.ctxtSwitchTicks =
1972 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1974 #if defined(RTS_SUPPORTS_THREADS)
1975 /* Initialise the mutex and condition variables used by
1977 initMutex(&rts_mutex);
1978 initMutex(&sched_mutex);
1979 initMutex(&term_mutex);
1980 #if defined(THREADED_RTS)
1981 initMutex(&thread_ready_aux_mutex);
1984 initCondition(&thread_ready_cond);
1985 initCondition(&gc_pending_cond);
1988 #if defined(THREADED_RTS)
1990 ACQUIRE_LOCK(&rts_mutex);
1992 sched_belch("worker thread (%d): acquired RTS lock\n", osThreadId()));
1995 /* Install the SIGHUP handler */
1998 struct sigaction action,oact;
2000 action.sa_handler = term_handler;
2001 sigemptyset(&action.sa_mask);
2002 action.sa_flags = 0;
2003 if (sigaction(SIGTERM, &action, &oact) != 0) {
2004 barf("can't install TERM handler");
2009 /* A capability holds the state a native thread needs in
2010 * order to execute STG code. At least one capability is
2011 * floating around (only SMP builds have more than one).
2015 #if defined(RTS_SUPPORTS_THREADS)
2016 /* start our haskell execution tasks */
2018 startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2020 startTaskManager(0,taskStart);
2024 #if /* defined(SMP) ||*/ defined(PAR)
2030 exitScheduler( void )
2032 #if defined(RTS_SUPPORTS_THREADS)
2037 /* -----------------------------------------------------------------------------
2038 Managing the per-task allocation areas.
2040 Each capability comes with an allocation area. These are
2041 fixed-length block lists into which allocation can be done.
2043 ToDo: no support for two-space collection at the moment???
2044 -------------------------------------------------------------------------- */
2046 /* -----------------------------------------------------------------------------
2047 * waitThread is the external interface for running a new computation
2048 * and waiting for the result.
2050 * In the non-SMP case, we create a new main thread, push it on the
2051 * main-thread stack, and invoke the scheduler to run it. The
2052 * scheduler will return when the top main thread on the stack has
2053 * completed or died, and fill in the necessary fields of the
2054 * main_thread structure.
2056 * In the SMP case, we create a main thread as before, but we then
2057 * create a new condition variable and sleep on it. When our new
2058 * main thread has completed, we'll be woken up and the status/result
2059 * will be in the main_thread struct.
2060 * -------------------------------------------------------------------------- */
2063 howManyThreadsAvail ( void )
2067 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2069 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2071 for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2077 finishAllThreads ( void )
2080 while (run_queue_hd != END_TSO_QUEUE) {
2081 waitThread ( run_queue_hd, NULL );
2083 while (blocked_queue_hd != END_TSO_QUEUE) {
2084 waitThread ( blocked_queue_hd, NULL );
2086 while (sleeping_queue != END_TSO_QUEUE) {
2087 waitThread ( blocked_queue_hd, NULL );
2090 (blocked_queue_hd != END_TSO_QUEUE ||
2091 run_queue_hd != END_TSO_QUEUE ||
2092 sleeping_queue != END_TSO_QUEUE);
2096 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2099 SchedulerStatus stat;
2101 ACQUIRE_LOCK(&sched_mutex);
2103 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2108 #if defined(RTS_SUPPORTS_THREADS)
2109 initCondition(&m->wakeup);
2112 m->link = main_threads;
2115 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n",
2120 waitCondition(&m->wakeup, &sched_mutex);
2121 } while (m->stat == NoStatus);
2123 /* GranSim specific init */
2124 CurrentTSO = m->tso; // the TSO to run
2125 procStatus[MainProc] = Busy; // status of main PE
2126 CurrentProc = MainProc; // PE to run it on
2130 RELEASE_LOCK(&sched_mutex);
2132 ASSERT(m->stat != NoStatus);
2137 #if defined(RTS_SUPPORTS_THREADS)
2138 closeCondition(&m->wakeup);
2141 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
2145 RELEASE_LOCK(&sched_mutex);
2150 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2151 //@subsection Run queue code
2155 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2156 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2157 implicit global variable that has to be correct when calling these
2161 /* Put the new thread on the head of the runnable queue.
2162 * The caller of createThread better push an appropriate closure
2163 * on this thread's stack before the scheduler is invoked.
2165 static /* inline */ void
2166 add_to_run_queue(tso)
2169 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2170 tso->link = run_queue_hd;
2172 if (run_queue_tl == END_TSO_QUEUE) {
2177 /* Put the new thread at the end of the runnable queue. */
2178 static /* inline */ void
2179 push_on_run_queue(tso)
2182 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2183 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2184 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2185 if (run_queue_hd == END_TSO_QUEUE) {
2188 run_queue_tl->link = tso;
2194 Should be inlined because it's used very often in schedule. The tso
2195 argument is actually only needed in GranSim, where we want to have the
2196 possibility to schedule *any* TSO on the run queue, irrespective of the
2197 actual ordering. Therefore, if tso is not the nil TSO then we traverse
2198 the run queue and dequeue the tso, adjusting the links in the queue.
2200 //@cindex take_off_run_queue
2201 static /* inline */ StgTSO*
2202 take_off_run_queue(StgTSO *tso) {
2206 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2208 if tso is specified, unlink that tso from the run_queue (doesn't have
2209 to be at the beginning of the queue); GranSim only
2211 if (tso!=END_TSO_QUEUE) {
2212 /* find tso in queue */
2213 for (t=run_queue_hd, prev=END_TSO_QUEUE;
2214 t!=END_TSO_QUEUE && t!=tso;
2218 /* now actually dequeue the tso */
2219 if (prev!=END_TSO_QUEUE) {
2220 ASSERT(run_queue_hd!=t);
2221 prev->link = t->link;
2223 /* t is at beginning of thread queue */
2224 ASSERT(run_queue_hd==t);
2225 run_queue_hd = t->link;
2227 /* t is at end of thread queue */
2228 if (t->link==END_TSO_QUEUE) {
2229 ASSERT(t==run_queue_tl);
2230 run_queue_tl = prev;
2232 ASSERT(run_queue_tl!=t);
2234 t->link = END_TSO_QUEUE;
2236 /* take tso from the beginning of the queue; std concurrent code */
2238 if (t != END_TSO_QUEUE) {
2239 run_queue_hd = t->link;
2240 t->link = END_TSO_QUEUE;
2241 if (run_queue_hd == END_TSO_QUEUE) {
2242 run_queue_tl = END_TSO_QUEUE;
2251 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2252 //@subsection Garbage Collextion Routines
2254 /* ---------------------------------------------------------------------------
2255 Where are the roots that we know about?
2257 - all the threads on the runnable queue
2258 - all the threads on the blocked queue
2259 - all the threads on the sleeping queue
2260 - all the thread currently executing a _ccall_GC
2261 - all the "main threads"
2263 ------------------------------------------------------------------------ */
2265 /* This has to be protected either by the scheduler monitor, or by the
2266 garbage collection monitor (probably the latter).
2271 GetRoots(evac_fn evac)
2278 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2279 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2280 evac((StgClosure **)&run_queue_hds[i]);
2281 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2282 evac((StgClosure **)&run_queue_tls[i]);
2284 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2285 evac((StgClosure **)&blocked_queue_hds[i]);
2286 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2287 evac((StgClosure **)&blocked_queue_tls[i]);
2288 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2289 evac((StgClosure **)&ccalling_threads[i]);
2296 if (run_queue_hd != END_TSO_QUEUE) {
2297 ASSERT(run_queue_tl != END_TSO_QUEUE);
2298 evac((StgClosure **)&run_queue_hd);
2299 evac((StgClosure **)&run_queue_tl);
2302 if (blocked_queue_hd != END_TSO_QUEUE) {
2303 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2304 evac((StgClosure **)&blocked_queue_hd);
2305 evac((StgClosure **)&blocked_queue_tl);
2308 if (sleeping_queue != END_TSO_QUEUE) {
2309 evac((StgClosure **)&sleeping_queue);
2313 for (m = main_threads; m != NULL; m = m->link) {
2314 evac((StgClosure **)&m->tso);
2316 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2317 evac((StgClosure **)&suspended_ccalling_threads);
2320 #if defined(PAR) || defined(GRAN)
2321 markSparkQueue(evac);
2325 /* -----------------------------------------------------------------------------
2328 This is the interface to the garbage collector from Haskell land.
2329 We provide this so that external C code can allocate and garbage
2330 collect when called from Haskell via _ccall_GC.
2332 It might be useful to provide an interface whereby the programmer
2333 can specify more roots (ToDo).
2335 This needs to be protected by the GC condition variable above. KH.
2336 -------------------------------------------------------------------------- */
2338 void (*extra_roots)(evac_fn);
2343 GarbageCollect(GetRoots,rtsFalse);
2347 performMajorGC(void)
2349 GarbageCollect(GetRoots,rtsTrue);
2353 AllRoots(evac_fn evac)
2355 GetRoots(evac); // the scheduler's roots
2356 extra_roots(evac); // the user's roots
2360 performGCWithRoots(void (*get_roots)(evac_fn))
2362 extra_roots = get_roots;
2363 GarbageCollect(AllRoots,rtsFalse);
2366 /* -----------------------------------------------------------------------------
2369 If the thread has reached its maximum stack size, then raise the
2370 StackOverflow exception in the offending thread. Otherwise
2371 relocate the TSO into a larger chunk of memory and adjust its stack
2373 -------------------------------------------------------------------------- */
2376 threadStackOverflow(StgTSO *tso)
2378 nat new_stack_size, new_tso_size, diff, stack_words;
2382 IF_DEBUG(sanity,checkTSO(tso));
2383 if (tso->stack_size >= tso->max_stack_size) {
2386 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2387 tso->id, tso, tso->stack_size, tso->max_stack_size);
2388 /* If we're debugging, just print out the top of the stack */
2389 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2392 /* Send this thread the StackOverflow exception */
2393 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2397 /* Try to double the current stack size. If that takes us over the
2398 * maximum stack size for this thread, then use the maximum instead.
2399 * Finally round up so the TSO ends up as a whole number of blocks.
2401 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2402 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2403 TSO_STRUCT_SIZE)/sizeof(W_);
2404 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2405 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2407 IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2409 dest = (StgTSO *)allocate(new_tso_size);
2410 TICK_ALLOC_TSO(new_stack_size,0);
2412 /* copy the TSO block and the old stack into the new area */
2413 memcpy(dest,tso,TSO_STRUCT_SIZE);
2414 stack_words = tso->stack + tso->stack_size - tso->sp;
2415 new_sp = (P_)dest + new_tso_size - stack_words;
2416 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2418 /* relocate the stack pointers... */
2419 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2420 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2422 dest->stack_size = new_stack_size;
2424 /* and relocate the update frame list */
2425 relocate_stack(dest, diff);
2427 /* Mark the old TSO as relocated. We have to check for relocated
2428 * TSOs in the garbage collector and any primops that deal with TSOs.
2430 * It's important to set the sp and su values to just beyond the end
2431 * of the stack, so we don't attempt to scavenge any part of the
2434 tso->what_next = ThreadRelocated;
2436 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2437 tso->su = (StgUpdateFrame *)tso->sp;
2438 tso->why_blocked = NotBlocked;
2439 dest->mut_link = NULL;
2441 IF_PAR_DEBUG(verbose,
2442 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2443 tso->id, tso, tso->stack_size);
2444 /* If we're debugging, just print out the top of the stack */
2445 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2448 IF_DEBUG(sanity,checkTSO(tso));
2450 IF_DEBUG(scheduler,printTSO(dest));
2456 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2457 //@subsection Blocking Queue Routines
2459 /* ---------------------------------------------------------------------------
2460 Wake up a queue that was blocked on some resource.
2461 ------------------------------------------------------------------------ */
2465 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2470 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2472 /* write RESUME events to log file and
2473 update blocked and fetch time (depending on type of the orig closure) */
2474 if (RtsFlags.ParFlags.ParStats.Full) {
2475 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2476 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2477 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2478 if (EMPTY_RUN_QUEUE())
2479 emitSchedule = rtsTrue;
2481 switch (get_itbl(node)->type) {
2483 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2488 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2495 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2502 static StgBlockingQueueElement *
2503 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2506 PEs node_loc, tso_loc;
2508 node_loc = where_is(node); // should be lifted out of loop
2509 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2510 tso_loc = where_is((StgClosure *)tso);
2511 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2512 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2513 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2514 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2515 // insertThread(tso, node_loc);
2516 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2518 tso, node, (rtsSpark*)NULL);
2519 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2522 } else { // TSO is remote (actually should be FMBQ)
2523 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2524 RtsFlags.GranFlags.Costs.gunblocktime +
2525 RtsFlags.GranFlags.Costs.latency;
2526 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2528 tso, node, (rtsSpark*)NULL);
2529 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2532 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2534 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2535 (node_loc==tso_loc ? "Local" : "Global"),
2536 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2537 tso->block_info.closure = NULL;
2538 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2542 static StgBlockingQueueElement *
2543 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2545 StgBlockingQueueElement *next;
2547 switch (get_itbl(bqe)->type) {
2549 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2550 /* if it's a TSO just push it onto the run_queue */
2552 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2553 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2555 unblockCount(bqe, node);
2556 /* reset blocking status after dumping event */
2557 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2561 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2563 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2564 PendingFetches = (StgBlockedFetch *)bqe;
2568 /* can ignore this case in a non-debugging setup;
2569 see comments on RBHSave closures above */
2571 /* check that the closure is an RBHSave closure */
2572 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2573 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2574 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2578 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2579 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2583 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2587 #else /* !GRAN && !PAR */
2589 unblockOneLocked(StgTSO *tso)
2593 ASSERT(get_itbl(tso)->type == TSO);
2594 ASSERT(tso->why_blocked != NotBlocked);
2595 tso->why_blocked = NotBlocked;
2597 PUSH_ON_RUN_QUEUE(tso);
2599 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2604 #if defined(GRAN) || defined(PAR)
2605 inline StgBlockingQueueElement *
2606 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2608 ACQUIRE_LOCK(&sched_mutex);
2609 bqe = unblockOneLocked(bqe, node);
2610 RELEASE_LOCK(&sched_mutex);
2615 unblockOne(StgTSO *tso)
2617 ACQUIRE_LOCK(&sched_mutex);
2618 tso = unblockOneLocked(tso);
2619 RELEASE_LOCK(&sched_mutex);
2626 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2628 StgBlockingQueueElement *bqe;
2633 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2634 node, CurrentProc, CurrentTime[CurrentProc],
2635 CurrentTSO->id, CurrentTSO));
2637 node_loc = where_is(node);
2639 ASSERT(q == END_BQ_QUEUE ||
2640 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2641 get_itbl(q)->type == CONSTR); // closure (type constructor)
2642 ASSERT(is_unique(node));
2644 /* FAKE FETCH: magically copy the node to the tso's proc;
2645 no Fetch necessary because in reality the node should not have been
2646 moved to the other PE in the first place
2648 if (CurrentProc!=node_loc) {
2650 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2651 node, node_loc, CurrentProc, CurrentTSO->id,
2652 // CurrentTSO, where_is(CurrentTSO),
2653 node->header.gran.procs));
2654 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2656 belch("## new bitmask of node %p is %#x",
2657 node, node->header.gran.procs));
2658 if (RtsFlags.GranFlags.GranSimStats.Global) {
2659 globalGranStats.tot_fake_fetches++;
2664 // ToDo: check: ASSERT(CurrentProc==node_loc);
2665 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2668 bqe points to the current element in the queue
2669 next points to the next element in the queue
2671 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2672 //tso_loc = where_is(tso);
2674 bqe = unblockOneLocked(bqe, node);
2677 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2678 the closure to make room for the anchor of the BQ */
2679 if (bqe!=END_BQ_QUEUE) {
2680 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2682 ASSERT((info_ptr==&RBH_Save_0_info) ||
2683 (info_ptr==&RBH_Save_1_info) ||
2684 (info_ptr==&RBH_Save_2_info));
2686 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2687 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2688 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2691 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2692 node, info_type(node)));
2695 /* statistics gathering */
2696 if (RtsFlags.GranFlags.GranSimStats.Global) {
2697 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2698 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2699 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2700 globalGranStats.tot_awbq++; // total no. of bqs awakened
2703 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2704 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2708 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2710 StgBlockingQueueElement *bqe;
2712 ACQUIRE_LOCK(&sched_mutex);
2714 IF_PAR_DEBUG(verbose,
2715 belch("##-_ AwBQ for node %p on [%x]: ",
2719 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2720 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2725 ASSERT(q == END_BQ_QUEUE ||
2726 get_itbl(q)->type == TSO ||
2727 get_itbl(q)->type == BLOCKED_FETCH ||
2728 get_itbl(q)->type == CONSTR);
2731 while (get_itbl(bqe)->type==TSO ||
2732 get_itbl(bqe)->type==BLOCKED_FETCH) {
2733 bqe = unblockOneLocked(bqe, node);
2735 RELEASE_LOCK(&sched_mutex);
2738 #else /* !GRAN && !PAR */
2740 awakenBlockedQueue(StgTSO *tso)
2742 ACQUIRE_LOCK(&sched_mutex);
2743 while (tso != END_TSO_QUEUE) {
2744 tso = unblockOneLocked(tso);
2746 RELEASE_LOCK(&sched_mutex);
2750 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2751 //@subsection Exception Handling Routines
2753 /* ---------------------------------------------------------------------------
2755 - usually called inside a signal handler so it mustn't do anything fancy.
2756 ------------------------------------------------------------------------ */
2759 interruptStgRts(void)
2765 /* -----------------------------------------------------------------------------
2768 This is for use when we raise an exception in another thread, which
2770 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2771 -------------------------------------------------------------------------- */
2773 #if defined(GRAN) || defined(PAR)
2775 NB: only the type of the blocking queue is different in GranSim and GUM
2776 the operations on the queue-elements are the same
2777 long live polymorphism!
2780 unblockThread(StgTSO *tso)
2782 StgBlockingQueueElement *t, **last;
2784 ACQUIRE_LOCK(&sched_mutex);
2785 switch (tso->why_blocked) {
2788 return; /* not blocked */
2791 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2793 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2794 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2796 last = (StgBlockingQueueElement **)&mvar->head;
2797 for (t = (StgBlockingQueueElement *)mvar->head;
2799 last = &t->link, last_tso = t, t = t->link) {
2800 if (t == (StgBlockingQueueElement *)tso) {
2801 *last = (StgBlockingQueueElement *)tso->link;
2802 if (mvar->tail == tso) {
2803 mvar->tail = (StgTSO *)last_tso;
2808 barf("unblockThread (MVAR): TSO not found");
2811 case BlockedOnBlackHole:
2812 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2814 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2816 last = &bq->blocking_queue;
2817 for (t = bq->blocking_queue;
2819 last = &t->link, t = t->link) {
2820 if (t == (StgBlockingQueueElement *)tso) {
2821 *last = (StgBlockingQueueElement *)tso->link;
2825 barf("unblockThread (BLACKHOLE): TSO not found");
2828 case BlockedOnException:
2830 StgTSO *target = tso->block_info.tso;
2832 ASSERT(get_itbl(target)->type == TSO);
2834 if (target->what_next == ThreadRelocated) {
2835 target = target->link;
2836 ASSERT(get_itbl(target)->type == TSO);
2839 ASSERT(target->blocked_exceptions != NULL);
2841 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2842 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2844 last = &t->link, t = t->link) {
2845 ASSERT(get_itbl(t)->type == TSO);
2846 if (t == (StgBlockingQueueElement *)tso) {
2847 *last = (StgBlockingQueueElement *)tso->link;
2851 barf("unblockThread (Exception): TSO not found");
2855 case BlockedOnWrite:
2857 /* take TSO off blocked_queue */
2858 StgBlockingQueueElement *prev = NULL;
2859 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2860 prev = t, t = t->link) {
2861 if (t == (StgBlockingQueueElement *)tso) {
2863 blocked_queue_hd = (StgTSO *)t->link;
2864 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2865 blocked_queue_tl = END_TSO_QUEUE;
2868 prev->link = t->link;
2869 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2870 blocked_queue_tl = (StgTSO *)prev;
2876 barf("unblockThread (I/O): TSO not found");
2879 case BlockedOnDelay:
2881 /* take TSO off sleeping_queue */
2882 StgBlockingQueueElement *prev = NULL;
2883 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2884 prev = t, t = t->link) {
2885 if (t == (StgBlockingQueueElement *)tso) {
2887 sleeping_queue = (StgTSO *)t->link;
2889 prev->link = t->link;
2894 barf("unblockThread (I/O): TSO not found");
2898 barf("unblockThread");
2902 tso->link = END_TSO_QUEUE;
2903 tso->why_blocked = NotBlocked;
2904 tso->block_info.closure = NULL;
2905 PUSH_ON_RUN_QUEUE(tso);
2906 RELEASE_LOCK(&sched_mutex);
2910 unblockThread(StgTSO *tso)
2914 ACQUIRE_LOCK(&sched_mutex);
2915 switch (tso->why_blocked) {
2918 return; /* not blocked */
2921 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2923 StgTSO *last_tso = END_TSO_QUEUE;
2924 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2927 for (t = mvar->head; t != END_TSO_QUEUE;
2928 last = &t->link, last_tso = t, t = t->link) {
2931 if (mvar->tail == tso) {
2932 mvar->tail = last_tso;
2937 barf("unblockThread (MVAR): TSO not found");
2940 case BlockedOnBlackHole:
2941 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2943 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2945 last = &bq->blocking_queue;
2946 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2947 last = &t->link, t = t->link) {
2953 barf("unblockThread (BLACKHOLE): TSO not found");
2956 case BlockedOnException:
2958 StgTSO *target = tso->block_info.tso;
2960 ASSERT(get_itbl(target)->type == TSO);
2962 while (target->what_next == ThreadRelocated) {
2963 target = target->link;
2964 ASSERT(get_itbl(target)->type == TSO);
2967 ASSERT(target->blocked_exceptions != NULL);
2969 last = &target->blocked_exceptions;
2970 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2971 last = &t->link, t = t->link) {
2972 ASSERT(get_itbl(t)->type == TSO);
2978 barf("unblockThread (Exception): TSO not found");
2982 case BlockedOnWrite:
2984 StgTSO *prev = NULL;
2985 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2986 prev = t, t = t->link) {
2989 blocked_queue_hd = t->link;
2990 if (blocked_queue_tl == t) {
2991 blocked_queue_tl = END_TSO_QUEUE;
2994 prev->link = t->link;
2995 if (blocked_queue_tl == t) {
2996 blocked_queue_tl = prev;
3002 barf("unblockThread (I/O): TSO not found");
3005 case BlockedOnDelay:
3007 StgTSO *prev = NULL;
3008 for (t = sleeping_queue; t != END_TSO_QUEUE;
3009 prev = t, t = t->link) {
3012 sleeping_queue = t->link;
3014 prev->link = t->link;
3019 barf("unblockThread (I/O): TSO not found");
3023 barf("unblockThread");
3027 tso->link = END_TSO_QUEUE;
3028 tso->why_blocked = NotBlocked;
3029 tso->block_info.closure = NULL;
3030 PUSH_ON_RUN_QUEUE(tso);
3031 RELEASE_LOCK(&sched_mutex);
3035 /* -----------------------------------------------------------------------------
3038 * The following function implements the magic for raising an
3039 * asynchronous exception in an existing thread.
3041 * We first remove the thread from any queue on which it might be
3042 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3044 * We strip the stack down to the innermost CATCH_FRAME, building
3045 * thunks in the heap for all the active computations, so they can
3046 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3047 * an application of the handler to the exception, and push it on
3048 * the top of the stack.
3050 * How exactly do we save all the active computations? We create an
3051 * AP_UPD for every UpdateFrame on the stack. Entering one of these
3052 * AP_UPDs pushes everything from the corresponding update frame
3053 * upwards onto the stack. (Actually, it pushes everything up to the
3054 * next update frame plus a pointer to the next AP_UPD object.
3055 * Entering the next AP_UPD object pushes more onto the stack until we
3056 * reach the last AP_UPD object - at which point the stack should look
3057 * exactly as it did when we killed the TSO and we can continue
3058 * execution by entering the closure on top of the stack.
3060 * We can also kill a thread entirely - this happens if either (a) the
3061 * exception passed to raiseAsync is NULL, or (b) there's no
3062 * CATCH_FRAME on the stack. In either case, we strip the entire
3063 * stack and replace the thread with a zombie.
3065 * -------------------------------------------------------------------------- */
3068 deleteThread(StgTSO *tso)
3070 raiseAsync(tso,NULL);
3074 raiseAsync(StgTSO *tso, StgClosure *exception)
3076 StgUpdateFrame* su = tso->su;
3077 StgPtr sp = tso->sp;
3079 /* Thread already dead? */
3080 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3084 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3086 /* Remove it from any blocking queues */
3089 /* The stack freezing code assumes there's a closure pointer on
3090 * the top of the stack. This isn't always the case with compiled
3091 * code, so we have to push a dummy closure on the top which just
3092 * returns to the next return address on the stack.
3094 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3095 *(--sp) = (W_)&stg_dummy_ret_closure;
3099 nat words = ((P_)su - (P_)sp) - 1;
3103 /* If we find a CATCH_FRAME, and we've got an exception to raise,
3104 * then build PAP(handler,exception,realworld#), and leave it on
3105 * top of the stack ready to enter.
3107 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3108 StgCatchFrame *cf = (StgCatchFrame *)su;
3109 /* we've got an exception to raise, so let's pass it to the
3110 * handler in this frame.
3112 ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3113 TICK_ALLOC_UPD_PAP(3,0);
3114 SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3117 ap->fun = cf->handler; /* :: Exception -> IO a */
3118 ap->payload[0] = exception;
3119 ap->payload[1] = ARG_TAG(0); /* realworld token */
3121 /* throw away the stack from Sp up to and including the
3124 sp = (P_)su + sizeofW(StgCatchFrame) - 1;
3127 /* Restore the blocked/unblocked state for asynchronous exceptions
3128 * at the CATCH_FRAME.
3130 * If exceptions were unblocked at the catch, arrange that they
3131 * are unblocked again after executing the handler by pushing an
3132 * unblockAsyncExceptions_ret stack frame.
3134 if (!cf->exceptions_blocked) {
3135 *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3138 /* Ensure that async exceptions are blocked when running the handler.
3140 if (tso->blocked_exceptions == NULL) {
3141 tso->blocked_exceptions = END_TSO_QUEUE;
3144 /* Put the newly-built PAP on top of the stack, ready to execute
3145 * when the thread restarts.
3149 tso->what_next = ThreadEnterGHC;
3150 IF_DEBUG(sanity, checkTSO(tso));
3154 /* First build an AP_UPD consisting of the stack chunk above the
3155 * current update frame, with the top word on the stack as the
3158 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3163 ap->fun = (StgClosure *)sp[0];
3165 for(i=0; i < (nat)words; ++i) {
3166 ap->payload[i] = (StgClosure *)*sp++;
3169 switch (get_itbl(su)->type) {
3173 SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */);
3174 TICK_ALLOC_UP_THK(words+1,0);
3177 fprintf(stderr, "scheduler: Updating ");
3178 printPtr((P_)su->updatee);
3179 fprintf(stderr, " with ");
3180 printObj((StgClosure *)ap);
3183 /* Replace the updatee with an indirection - happily
3184 * this will also wake up any threads currently
3185 * waiting on the result.
3187 * Warning: if we're in a loop, more than one update frame on
3188 * the stack may point to the same object. Be careful not to
3189 * overwrite an IND_OLDGEN in this case, because we'll screw
3190 * up the mutable lists. To be on the safe side, don't
3191 * overwrite any kind of indirection at all. See also
3192 * threadSqueezeStack in GC.c, where we have to make a similar
3195 if (!closure_IND(su->updatee)) {
3196 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
3199 sp += sizeofW(StgUpdateFrame) -1;
3200 sp[0] = (W_)ap; /* push onto stack */
3206 StgCatchFrame *cf = (StgCatchFrame *)su;
3209 /* We want a PAP, not an AP_UPD. Fortunately, the
3210 * layout's the same.
3212 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3213 TICK_ALLOC_UPD_PAP(words+1,0);
3215 /* now build o = FUN(catch,ap,handler) */
3216 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3217 TICK_ALLOC_FUN(2,0);
3218 SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3219 o->payload[0] = (StgClosure *)ap;
3220 o->payload[1] = cf->handler;
3223 fprintf(stderr, "scheduler: Built ");
3224 printObj((StgClosure *)o);
3227 /* pop the old handler and put o on the stack */
3229 sp += sizeofW(StgCatchFrame) - 1;
3236 StgSeqFrame *sf = (StgSeqFrame *)su;
3239 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3240 TICK_ALLOC_UPD_PAP(words+1,0);
3242 /* now build o = FUN(seq,ap) */
3243 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3244 TICK_ALLOC_SE_THK(1,0);
3245 SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3246 o->payload[0] = (StgClosure *)ap;
3249 fprintf(stderr, "scheduler: Built ");
3250 printObj((StgClosure *)o);
3253 /* pop the old handler and put o on the stack */
3255 sp += sizeofW(StgSeqFrame) - 1;
3261 /* We've stripped the entire stack, the thread is now dead. */
3262 sp += sizeofW(StgStopFrame) - 1;
3263 sp[0] = (W_)exception; /* save the exception */
3264 tso->what_next = ThreadKilled;
3265 tso->su = (StgUpdateFrame *)(sp+1);
3276 /* -----------------------------------------------------------------------------
3277 resurrectThreads is called after garbage collection on the list of
3278 threads found to be garbage. Each of these threads will be woken
3279 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3280 on an MVar, or NonTermination if the thread was blocked on a Black
3282 -------------------------------------------------------------------------- */
3285 resurrectThreads( StgTSO *threads )
3289 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3290 next = tso->global_link;
3291 tso->global_link = all_threads;
3293 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3295 switch (tso->why_blocked) {
3297 case BlockedOnException:
3298 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3300 case BlockedOnBlackHole:
3301 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3304 /* This might happen if the thread was blocked on a black hole
3305 * belonging to a thread that we've just woken up (raiseAsync
3306 * can wake up threads, remember...).
3310 barf("resurrectThreads: thread blocked in a strange way");
3315 /* -----------------------------------------------------------------------------
3316 * Blackhole detection: if we reach a deadlock, test whether any
3317 * threads are blocked on themselves. Any threads which are found to
3318 * be self-blocked get sent a NonTermination exception.
3320 * This is only done in a deadlock situation in order to avoid
3321 * performance overhead in the normal case.
3322 * -------------------------------------------------------------------------- */
3325 detectBlackHoles( void )
3327 StgTSO *t = all_threads;
3328 StgUpdateFrame *frame;
3329 StgClosure *blocked_on;
3331 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3333 while (t->what_next == ThreadRelocated) {
3335 ASSERT(get_itbl(t)->type == TSO);
3338 if (t->why_blocked != BlockedOnBlackHole) {
3342 blocked_on = t->block_info.closure;
3344 for (frame = t->su; ; frame = frame->link) {
3345 switch (get_itbl(frame)->type) {
3348 if (frame->updatee == blocked_on) {
3349 /* We are blocking on one of our own computations, so
3350 * send this thread the NonTermination exception.
3353 sched_belch("thread %d is blocked on itself", t->id));
3354 raiseAsync(t, (StgClosure *)NonTermination_closure);
3375 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3376 //@subsection Debugging Routines
3378 /* -----------------------------------------------------------------------------
3379 Debugging: why is a thread blocked
3380 -------------------------------------------------------------------------- */
3385 printThreadBlockage(StgTSO *tso)
3387 switch (tso->why_blocked) {
3389 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3391 case BlockedOnWrite:
3392 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3394 case BlockedOnDelay:
3395 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3398 fprintf(stderr,"is blocked on an MVar");
3400 case BlockedOnException:
3401 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3402 tso->block_info.tso->id);
3404 case BlockedOnBlackHole:
3405 fprintf(stderr,"is blocked on a black hole");
3408 fprintf(stderr,"is not blocked");
3412 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3413 tso->block_info.closure, info_type(tso->block_info.closure));
3415 case BlockedOnGA_NoSend:
3416 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3417 tso->block_info.closure, info_type(tso->block_info.closure));
3421 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3422 tso->why_blocked, tso->id, tso);
3427 printThreadStatus(StgTSO *tso)
3429 switch (tso->what_next) {
3431 fprintf(stderr,"has been killed");
3433 case ThreadComplete:
3434 fprintf(stderr,"has completed");
3437 printThreadBlockage(tso);
3442 printAllThreads(void)
3447 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3448 ullong_format_string(TIME_ON_PROC(CurrentProc),
3449 time_string, rtsFalse/*no commas!*/);
3451 sched_belch("all threads at [%s]:", time_string);
3453 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3454 ullong_format_string(CURRENT_TIME,
3455 time_string, rtsFalse/*no commas!*/);
3457 sched_belch("all threads at [%s]:", time_string);
3459 sched_belch("all threads:");
3462 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3463 fprintf(stderr, "\tthread %d ", t->id);
3464 printThreadStatus(t);
3465 fprintf(stderr,"\n");
3470 Print a whole blocking queue attached to node (debugging only).
3475 print_bq (StgClosure *node)
3477 StgBlockingQueueElement *bqe;
3481 fprintf(stderr,"## BQ of closure %p (%s): ",
3482 node, info_type(node));
3484 /* should cover all closures that may have a blocking queue */
3485 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3486 get_itbl(node)->type == FETCH_ME_BQ ||
3487 get_itbl(node)->type == RBH ||
3488 get_itbl(node)->type == MVAR);
3490 ASSERT(node!=(StgClosure*)NULL); // sanity check
3492 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3496 Print a whole blocking queue starting with the element bqe.
3499 print_bqe (StgBlockingQueueElement *bqe)
3504 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3506 for (end = (bqe==END_BQ_QUEUE);
3507 !end; // iterate until bqe points to a CONSTR
3508 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3509 bqe = end ? END_BQ_QUEUE : bqe->link) {
3510 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3511 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3512 /* types of closures that may appear in a blocking queue */
3513 ASSERT(get_itbl(bqe)->type == TSO ||
3514 get_itbl(bqe)->type == BLOCKED_FETCH ||
3515 get_itbl(bqe)->type == CONSTR);
3516 /* only BQs of an RBH end with an RBH_Save closure */
3517 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3519 switch (get_itbl(bqe)->type) {
3521 fprintf(stderr," TSO %u (%x),",
3522 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3525 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3526 ((StgBlockedFetch *)bqe)->node,
3527 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3528 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3529 ((StgBlockedFetch *)bqe)->ga.weight);
3532 fprintf(stderr," %s (IP %p),",
3533 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3534 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3535 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3536 "RBH_Save_?"), get_itbl(bqe));
3539 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3540 info_type((StgClosure *)bqe)); // , node, info_type(node));
3544 fputc('\n', stderr);
3546 # elif defined(GRAN)
3548 print_bq (StgClosure *node)
3550 StgBlockingQueueElement *bqe;
3551 PEs node_loc, tso_loc;
3554 /* should cover all closures that may have a blocking queue */
3555 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3556 get_itbl(node)->type == FETCH_ME_BQ ||
3557 get_itbl(node)->type == RBH);
3559 ASSERT(node!=(StgClosure*)NULL); // sanity check
3560 node_loc = where_is(node);
3562 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3563 node, info_type(node), node_loc);
3566 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3568 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3569 !end; // iterate until bqe points to a CONSTR
3570 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3571 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3572 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3573 /* types of closures that may appear in a blocking queue */
3574 ASSERT(get_itbl(bqe)->type == TSO ||
3575 get_itbl(bqe)->type == CONSTR);
3576 /* only BQs of an RBH end with an RBH_Save closure */
3577 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3579 tso_loc = where_is((StgClosure *)bqe);
3580 switch (get_itbl(bqe)->type) {
3582 fprintf(stderr," TSO %d (%p) on [PE %d],",
3583 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3586 fprintf(stderr," %s (IP %p),",
3587 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3588 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3589 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3590 "RBH_Save_?"), get_itbl(bqe));
3593 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3594 info_type((StgClosure *)bqe), node, info_type(node));
3598 fputc('\n', stderr);
3602 Nice and easy: only TSOs on the blocking queue
3605 print_bq (StgClosure *node)
3609 ASSERT(node!=(StgClosure*)NULL); // sanity check
3610 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3611 tso != END_TSO_QUEUE;
3613 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3614 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3615 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3617 fputc('\n', stderr);
3628 for (i=0, tso=run_queue_hd;
3629 tso != END_TSO_QUEUE;
3638 sched_belch(char *s, ...)
3643 fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3645 fprintf(stderr, "== ");
3647 fprintf(stderr, "scheduler: ");
3649 vfprintf(stderr, s, ap);
3650 fprintf(stderr, "\n");
3656 //@node Index, , Debugging Routines, Main scheduling code
3660 //* MainRegTable:: @cindex\s-+MainRegTable
3661 //* StgMainThread:: @cindex\s-+StgMainThread
3662 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3663 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3664 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3665 //* context_switch:: @cindex\s-+context_switch
3666 //* createThread:: @cindex\s-+createThread
3667 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3668 //* initScheduler:: @cindex\s-+initScheduler
3669 //* interrupted:: @cindex\s-+interrupted
3670 //* next_thread_id:: @cindex\s-+next_thread_id
3671 //* print_bq:: @cindex\s-+print_bq
3672 //* run_queue_hd:: @cindex\s-+run_queue_hd
3673 //* run_queue_tl:: @cindex\s-+run_queue_tl
3674 //* sched_mutex:: @cindex\s-+sched_mutex
3675 //* schedule:: @cindex\s-+schedule
3676 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3677 //* term_mutex:: @cindex\s-+term_mutex
3678 //* thread_ready_cond:: @cindex\s-+thread_ready_cond