1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2004
7 * Different GHC ways use this scheduler quite differently (see comments below)
8 * Here is the global picture:
10 * WAY Name CPP flag What's it for
11 * --------------------------------------
12 * mp GUM PAR Parallel execution on a distrib. memory machine
13 * s SMP SMP Parallel execution on a shared memory machine
14 * mg GranSim GRAN Simulation of parallel execution
15 * md GUM/GdH DIST Distributed execution (based on GUM)
17 * --------------------------------------------------------------------------*/
20 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22 The main scheduling loop in GUM iterates until a finish message is received.
23 In that case a global flag @receivedFinish@ is set and this instance of
24 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25 for the handling of incoming messages, such as PP_FINISH.
26 Note that in the parallel case we have a system manager that coordinates
27 different PEs, each of which are running one instance of the RTS.
28 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33 The main scheduling code in GranSim is quite different from that in std
34 (concurrent) Haskell: while concurrent Haskell just iterates over the
35 threads in the runnable queue, GranSim is event driven, i.e. it iterates
36 over the events in the global event queue. -- HWL
39 #include "PosixSource.h"
44 #include "BlockAlloc.h"
48 #define COMPILING_SCHEDULER
50 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
65 #include "Proftimer.h"
68 #if defined(GRAN) || defined(PAR)
69 # include "GranSimRts.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
78 #include "Capability.h"
79 #include "OSThreads.h"
82 #ifdef HAVE_SYS_TYPES_H
83 #include <sys/types.h>
98 #define USED_IN_THREADED_RTS
100 #define USED_IN_THREADED_RTS STG_UNUSED
103 #ifdef RTS_SUPPORTS_THREADS
104 #define USED_WHEN_RTS_SUPPORTS_THREADS
106 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
109 /* Main thread queue.
110 * Locks required: sched_mutex.
112 StgMainThread *main_threads = NULL;
115 * Locks required: sched_mutex.
119 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
120 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
123 In GranSim we have a runnable and a blocked queue for each processor.
124 In order to minimise code changes new arrays run_queue_hds/tls
125 are created. run_queue_hd is then a short cut (macro) for
126 run_queue_hds[CurrentProc] (see GranSim.h).
129 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
130 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
131 StgTSO *ccalling_threadss[MAX_PROC];
132 /* We use the same global list of threads (all_threads) in GranSim as in
133 the std RTS (i.e. we are cheating). However, we don't use this list in
134 the GranSim specific code at the moment (so we are only potentially
139 StgTSO *run_queue_hd = NULL;
140 StgTSO *run_queue_tl = NULL;
141 StgTSO *blocked_queue_hd = NULL;
142 StgTSO *blocked_queue_tl = NULL;
143 StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */
147 /* Linked list of all threads.
148 * Used for detecting garbage collected threads.
150 StgTSO *all_threads = NULL;
152 /* When a thread performs a safe C call (_ccall_GC, using old
153 * terminology), it gets put on the suspended_ccalling_threads
154 * list. Used by the garbage collector.
156 static StgTSO *suspended_ccalling_threads;
158 static StgTSO *threadStackOverflow(StgTSO *tso);
160 /* KH: The following two flags are shared memory locations. There is no need
161 to lock them, since they are only unset at the end of a scheduler
165 /* flag set by signal handler to precipitate a context switch */
166 int context_switch = 0;
168 /* if this flag is set as well, give up execution */
169 rtsBool interrupted = rtsFalse;
171 /* Next thread ID to allocate.
172 * Locks required: thread_id_mutex
174 static StgThreadID next_thread_id = 1;
177 * Pointers to the state of the current thread.
178 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
179 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
182 /* The smallest stack size that makes any sense is:
183 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
184 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
185 * + 1 (the closure to enter)
187 * + 1 (spare slot req'd by stg_ap_v_ret)
189 * A thread with this stack will bomb immediately with a stack
190 * overflow, which will increase its stack size.
193 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
200 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
201 * exists - earlier gccs apparently didn't.
206 static rtsBool ready_to_gc;
209 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
210 * in an MT setting, needed to signal that a worker thread shouldn't hang around
211 * in the scheduler when it is out of work.
213 static rtsBool shutting_down_scheduler = rtsFalse;
215 void addToBlockedQueue ( StgTSO *tso );
217 static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
218 void interruptStgRts ( void );
220 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
221 static void detectBlackHoles ( void );
224 static void raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);
226 #if defined(RTS_SUPPORTS_THREADS)
227 /* ToDo: carefully document the invariants that go together
228 * with these synchronisation objects.
230 Mutex sched_mutex = INIT_MUTEX_VAR;
231 Mutex term_mutex = INIT_MUTEX_VAR;
233 #endif /* RTS_SUPPORTS_THREADS */
237 rtsTime TimeOfLastYield;
238 rtsBool emitSchedule = rtsTrue;
242 static char *whatNext_strs[] = {
253 StgTSO * createSparkThread(rtsSpark spark);
254 StgTSO * activateSpark (rtsSpark spark);
257 /* ----------------------------------------------------------------------------
259 * ------------------------------------------------------------------------- */
261 #if defined(RTS_SUPPORTS_THREADS)
262 static rtsBool startingWorkerThread = rtsFalse;
264 static void taskStart(void);
268 ACQUIRE_LOCK(&sched_mutex);
269 startingWorkerThread = rtsFalse;
271 RELEASE_LOCK(&sched_mutex);
275 startSchedulerTaskIfNecessary(void)
277 if(run_queue_hd != END_TSO_QUEUE
278 || blocked_queue_hd != END_TSO_QUEUE
279 || sleeping_queue != END_TSO_QUEUE)
281 if(!startingWorkerThread)
282 { // we don't want to start another worker thread
283 // just because the last one hasn't yet reached the
284 // "waiting for capability" state
285 startingWorkerThread = rtsTrue;
286 if(!startTask(taskStart))
288 startingWorkerThread = rtsFalse;
295 /* ---------------------------------------------------------------------------
296 Main scheduling loop.
298 We use round-robin scheduling, each thread returning to the
299 scheduler loop when one of these conditions is detected:
302 * timer expires (thread yields)
307 Locking notes: we acquire the scheduler lock once at the beginning
308 of the scheduler loop, and release it when
310 * running a thread, or
311 * waiting for work, or
312 * waiting for a GC to complete.
315 In a GranSim setup this loop iterates over the global event queue.
316 This revolves around the global event queue, which determines what
317 to do next. Therefore, it's more complicated than either the
318 concurrent or the parallel (GUM) setup.
321 GUM iterates over incoming messages.
322 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
323 and sends out a fish whenever it has nothing to do; in-between
324 doing the actual reductions (shared code below) it processes the
325 incoming messages and deals with delayed operations
326 (see PendingFetches).
327 This is not the ugliest code you could imagine, but it's bloody close.
329 ------------------------------------------------------------------------ */
331 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
332 Capability *initialCapability )
336 StgThreadReturnCode ret;
344 rtsBool receivedFinish = rtsFalse;
346 nat tp_size, sp_size; // stats only
349 rtsBool was_interrupted = rtsFalse;
352 // Pre-condition: sched_mutex is held.
353 // We might have a capability, passed in as initialCapability.
354 cap = initialCapability;
356 #if defined(RTS_SUPPORTS_THREADS)
358 // in the threaded case, the capability is either passed in via the
359 // initialCapability parameter, or initialized inside the scheduler
363 sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
364 mainThread, initialCapability);
367 // simply initialise it in the non-threaded case
368 grabCapability(&cap);
372 /* set up first event to get things going */
373 /* ToDo: assign costs for system setup and init MainTSO ! */
374 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
376 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
379 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
380 G_TSO(CurrentTSO, 5));
382 if (RtsFlags.GranFlags.Light) {
383 /* Save current time; GranSim Light only */
384 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
387 event = get_next_event();
389 while (event!=(rtsEvent*)NULL) {
390 /* Choose the processor with the next event */
391 CurrentProc = event->proc;
392 CurrentTSO = event->tso;
396 while (!receivedFinish) { /* set by processMessages */
397 /* when receiving PP_FINISH message */
399 #else // everything except GRAN and PAR
405 IF_DEBUG(scheduler, printAllThreads());
407 #if defined(RTS_SUPPORTS_THREADS)
408 // Yield the capability to higher-priority tasks if necessary.
411 yieldCapability(&cap);
414 // If we do not currently hold a capability, we wait for one
417 waitForCapability(&sched_mutex, &cap,
418 mainThread ? &mainThread->bound_thread_cond : NULL);
421 // We now have a capability...
425 // If we're interrupted (the user pressed ^C, or some other
426 // termination condition occurred), kill all the currently running
430 IF_DEBUG(scheduler, sched_belch("interrupted"));
431 interrupted = rtsFalse;
432 was_interrupted = rtsTrue;
433 #if defined(RTS_SUPPORTS_THREADS)
434 // In the threaded RTS, deadlock detection doesn't work,
435 // so just exit right away.
436 errorBelch("interrupted");
437 releaseCapability(cap);
438 RELEASE_LOCK(&sched_mutex);
439 shutdownHaskellAndExit(EXIT_SUCCESS);
445 #if defined(RTS_USER_SIGNALS)
446 // check for signals each time around the scheduler
447 if (signals_pending()) {
448 RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
449 startSignalHandlers();
450 ACQUIRE_LOCK(&sched_mutex);
455 // Check whether any waiting threads need to be woken up. If the
456 // run queue is empty, and there are no other tasks running, we
457 // can wait indefinitely for something to happen.
459 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
461 #if defined(RTS_SUPPORTS_THREADS)
462 // We shouldn't be here...
463 barf("schedule: awaitEvent() in threaded RTS");
465 awaitEvent( EMPTY_RUN_QUEUE() );
467 // we can be interrupted while waiting for I/O...
468 if (interrupted) continue;
471 * Detect deadlock: when we have no threads to run, there are no
472 * threads waiting on I/O or sleeping, and all the other tasks are
473 * waiting for work, we must have a deadlock of some description.
475 * We first try to find threads blocked on themselves (ie. black
476 * holes), and generate NonTermination exceptions where necessary.
478 * If no threads are black holed, we have a deadlock situation, so
479 * inform all the main threads.
481 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
482 if ( EMPTY_THREAD_QUEUES() )
484 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
486 // Garbage collection can release some new threads due to
487 // either (a) finalizers or (b) threads resurrected because
488 // they are unreachable and will therefore be sent an
489 // exception. Any threads thus released will be immediately
491 GarbageCollect(GetRoots,rtsTrue);
492 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
494 #if defined(RTS_USER_SIGNALS)
495 /* If we have user-installed signal handlers, then wait
496 * for signals to arrive rather then bombing out with a
499 if ( anyUserHandlers() ) {
501 sched_belch("still deadlocked, waiting for signals..."));
505 // we might be interrupted...
506 if (interrupted) { continue; }
508 if (signals_pending()) {
509 RELEASE_LOCK(&sched_mutex);
510 startSignalHandlers();
511 ACQUIRE_LOCK(&sched_mutex);
513 ASSERT(!EMPTY_RUN_QUEUE());
518 /* Probably a real deadlock. Send the current main thread the
519 * Deadlock exception (or in the SMP build, send *all* main
520 * threads the deadlock exception, since none of them can make
526 switch (m->tso->why_blocked) {
527 case BlockedOnBlackHole:
528 case BlockedOnException:
530 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
533 barf("deadlock: main thread blocked in a strange way");
539 #elif defined(RTS_SUPPORTS_THREADS)
540 // ToDo: add deadlock detection in threaded RTS
542 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
545 #if defined(RTS_SUPPORTS_THREADS)
546 if ( EMPTY_RUN_QUEUE() ) {
547 continue; // nothing to do
552 if (RtsFlags.GranFlags.Light)
553 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
555 /* adjust time based on time-stamp */
556 if (event->time > CurrentTime[CurrentProc] &&
557 event->evttype != ContinueThread)
558 CurrentTime[CurrentProc] = event->time;
560 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
561 if (!RtsFlags.GranFlags.Light)
564 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
566 /* main event dispatcher in GranSim */
567 switch (event->evttype) {
568 /* Should just be continuing execution */
570 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
571 /* ToDo: check assertion
572 ASSERT(run_queue_hd != (StgTSO*)NULL &&
573 run_queue_hd != END_TSO_QUEUE);
575 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
576 if (!RtsFlags.GranFlags.DoAsyncFetch &&
577 procStatus[CurrentProc]==Fetching) {
578 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
579 CurrentTSO->id, CurrentTSO, CurrentProc);
582 /* Ignore ContinueThreads for completed threads */
583 if (CurrentTSO->what_next == ThreadComplete) {
584 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
585 CurrentTSO->id, CurrentTSO, CurrentProc);
588 /* Ignore ContinueThreads for threads that are being migrated */
589 if (PROCS(CurrentTSO)==Nowhere) {
590 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
591 CurrentTSO->id, CurrentTSO, CurrentProc);
594 /* The thread should be at the beginning of the run queue */
595 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
596 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
597 CurrentTSO->id, CurrentTSO, CurrentProc);
598 break; // run the thread anyway
601 new_event(proc, proc, CurrentTime[proc],
603 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
605 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
606 break; // now actually run the thread; DaH Qu'vam yImuHbej
609 do_the_fetchnode(event);
610 goto next_thread; /* handle next event in event queue */
613 do_the_globalblock(event);
614 goto next_thread; /* handle next event in event queue */
617 do_the_fetchreply(event);
618 goto next_thread; /* handle next event in event queue */
620 case UnblockThread: /* Move from the blocked queue to the tail of */
621 do_the_unblock(event);
622 goto next_thread; /* handle next event in event queue */
624 case ResumeThread: /* Move from the blocked queue to the tail of */
625 /* the runnable queue ( i.e. Qu' SImqa'lu') */
626 event->tso->gran.blocktime +=
627 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
628 do_the_startthread(event);
629 goto next_thread; /* handle next event in event queue */
632 do_the_startthread(event);
633 goto next_thread; /* handle next event in event queue */
636 do_the_movethread(event);
637 goto next_thread; /* handle next event in event queue */
640 do_the_movespark(event);
641 goto next_thread; /* handle next event in event queue */
644 do_the_findwork(event);
645 goto next_thread; /* handle next event in event queue */
648 barf("Illegal event type %u\n", event->evttype);
651 /* This point was scheduler_loop in the old RTS */
653 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
655 TimeOfLastEvent = CurrentTime[CurrentProc];
656 TimeOfNextEvent = get_time_of_next_event();
657 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
658 // CurrentTSO = ThreadQueueHd;
660 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
663 if (RtsFlags.GranFlags.Light)
664 GranSimLight_leave_system(event, &ActiveTSO);
666 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
669 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
671 /* in a GranSim setup the TSO stays on the run queue */
673 /* Take a thread from the run queue. */
674 POP_RUN_QUEUE(t); // take_off_run_queue(t);
677 debugBelch("GRAN: About to run current thread, which is\n");
680 context_switch = 0; // turned on via GranYield, checking events and time slice
683 DumpGranEvent(GR_SCHEDULE, t));
685 procStatus[CurrentProc] = Busy;
688 if (PendingFetches != END_BF_QUEUE) {
692 /* ToDo: phps merge with spark activation above */
693 /* check whether we have local work and send requests if we have none */
694 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
695 /* :-[ no local threads => look out for local sparks */
696 /* the spark pool for the current PE */
697 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
698 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
699 pool->hd < pool->tl) {
701 * ToDo: add GC code check that we really have enough heap afterwards!!
703 * If we're here (no runnable threads) and we have pending
704 * sparks, we must have a space problem. Get enough space
705 * to turn one of those pending sparks into a
709 spark = findSpark(rtsFalse); /* get a spark */
710 if (spark != (rtsSpark) NULL) {
711 tso = activateSpark(spark); /* turn the spark into a thread */
712 IF_PAR_DEBUG(schedule,
713 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
714 tso->id, tso, advisory_thread_count));
716 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
717 debugBelch("==^^ failed to activate spark\n");
719 } /* otherwise fall through & pick-up new tso */
721 IF_PAR_DEBUG(verbose,
722 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
723 spark_queue_len(pool)));
728 /* If we still have no work we need to send a FISH to get a spark
731 if (EMPTY_RUN_QUEUE()) {
732 /* =8-[ no local sparks => look for work on other PEs */
734 * We really have absolutely no work. Send out a fish
735 * (there may be some out there already), and wait for
736 * something to arrive. We clearly can't run any threads
737 * until a SCHEDULE or RESUME arrives, and so that's what
738 * we're hoping to see. (Of course, we still have to
739 * respond to other types of messages.)
741 TIME now = msTime() /*CURRENT_TIME*/;
742 IF_PAR_DEBUG(verbose,
743 debugBelch("-- now=%ld\n", now));
744 IF_PAR_DEBUG(verbose,
745 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
746 (last_fish_arrived_at!=0 &&
747 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
748 debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
749 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
750 last_fish_arrived_at,
751 RtsFlags.ParFlags.fishDelay, now);
754 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
755 (last_fish_arrived_at==0 ||
756 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
757 /* outstandingFishes is set in sendFish, processFish;
758 avoid flooding system with fishes via delay */
760 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
763 // Global statistics: count no. of fishes
764 if (RtsFlags.ParFlags.ParStats.Global &&
765 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
766 globalParStats.tot_fish_mess++;
770 receivedFinish = processMessages();
773 } else if (PacketsWaiting()) { /* Look for incoming messages */
774 receivedFinish = processMessages();
777 /* Now we are sure that we have some work available */
778 ASSERT(run_queue_hd != END_TSO_QUEUE);
780 /* Take a thread from the run queue, if we have work */
781 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
782 IF_DEBUG(sanity,checkTSO(t));
784 /* ToDo: write something to the log-file
785 if (RTSflags.ParFlags.granSimStats && !sameThread)
786 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
790 /* the spark pool for the current PE */
791 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
794 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
795 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
798 if (0 && RtsFlags.ParFlags.ParStats.Full &&
799 t && LastTSO && t->id != LastTSO->id &&
800 LastTSO->why_blocked == NotBlocked &&
801 LastTSO->what_next != ThreadComplete) {
802 // if previously scheduled TSO not blocked we have to record the context switch
803 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
804 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
807 if (RtsFlags.ParFlags.ParStats.Full &&
808 (emitSchedule /* forced emit */ ||
809 (t && LastTSO && t->id != LastTSO->id))) {
811 we are running a different TSO, so write a schedule event to log file
812 NB: If we use fair scheduling we also have to write a deschedule
813 event for LastTSO; with unfair scheduling we know that the
814 previous tso has blocked whenever we switch to another tso, so
815 we don't need it in GUM for now
817 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
818 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
819 emitSchedule = rtsFalse;
823 #else /* !GRAN && !PAR */
825 // grab a thread from the run queue
826 ASSERT(run_queue_hd != END_TSO_QUEUE);
829 // Sanity check the thread we're about to run. This can be
830 // expensive if there is lots of thread switching going on...
831 IF_DEBUG(sanity,checkTSO(t));
836 StgMainThread *m = t->main;
843 sched_belch("### Running thread %d in bound thread", t->id));
844 // yes, the Haskell thread is bound to the current native thread
849 sched_belch("### thread %d bound to another OS thread", t->id));
850 // no, bound to a different Haskell thread: pass to that thread
851 PUSH_ON_RUN_QUEUE(t);
852 passCapability(&m->bound_thread_cond);
858 if(mainThread != NULL)
859 // The thread we want to run is bound.
862 sched_belch("### this OS thread cannot run thread %d", t->id));
863 // no, the current native thread is bound to a different
864 // Haskell thread, so pass it to any worker thread
865 PUSH_ON_RUN_QUEUE(t);
866 passCapabilityToWorker();
873 cap->r.rCurrentTSO = t;
875 /* context switches are now initiated by the timer signal, unless
876 * the user specified "context switch as often as possible", with
879 if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
880 && (run_queue_hd != END_TSO_QUEUE
881 || blocked_queue_hd != END_TSO_QUEUE
882 || sleeping_queue != END_TSO_QUEUE)))
887 RELEASE_LOCK(&sched_mutex);
889 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
890 (long)t->id, whatNext_strs[t->what_next]));
893 startHeapProfTimer();
896 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
897 /* Run the current thread
899 prev_what_next = t->what_next;
901 errno = t->saved_errno;
903 switch (prev_what_next) {
907 /* Thread already finished, return to scheduler. */
908 ret = ThreadFinished;
912 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
915 case ThreadInterpret:
916 ret = interpretBCO(cap);
920 barf("schedule: invalid what_next field");
923 // The TSO might have moved, so find the new location:
924 t = cap->r.rCurrentTSO;
926 // And save the current errno in this thread.
927 t->saved_errno = errno;
929 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
931 /* Costs for the scheduler are assigned to CCS_SYSTEM */
937 ACQUIRE_LOCK(&sched_mutex);
939 #ifdef RTS_SUPPORTS_THREADS
940 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
941 #elif !defined(GRAN) && !defined(PAR)
942 IF_DEBUG(scheduler,debugBelch("sched: "););
946 /* HACK 675: if the last thread didn't yield, make sure to print a
947 SCHEDULE event to the log file when StgRunning the next thread, even
948 if it is the same one as before */
950 TimeOfLastYield = CURRENT_TIME;
956 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
957 globalGranStats.tot_heapover++;
959 globalParStats.tot_heapover++;
962 // did the task ask for a large block?
963 if (cap->r.rHpAlloc > BLOCK_SIZE) {
964 // if so, get one and push it on the front of the nursery.
968 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
970 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n",
971 (long)t->id, whatNext_strs[t->what_next], blocks));
973 // don't do this if it would push us over the
974 // alloc_blocks_lim limit; we'll GC first.
975 if (alloc_blocks + blocks < alloc_blocks_lim) {
977 alloc_blocks += blocks;
978 bd = allocGroup( blocks );
980 // link the new group into the list
981 bd->link = cap->r.rCurrentNursery;
982 bd->u.back = cap->r.rCurrentNursery->u.back;
983 if (cap->r.rCurrentNursery->u.back != NULL) {
984 cap->r.rCurrentNursery->u.back->link = bd;
986 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
987 g0s0->blocks == cap->r.rNursery);
988 cap->r.rNursery = g0s0->blocks = bd;
990 cap->r.rCurrentNursery->u.back = bd;
992 // initialise it as a nursery block. We initialise the
993 // step, gen_no, and flags field of *every* sub-block in
994 // this large block, because this is easier than making
995 // sure that we always find the block head of a large
996 // block whenever we call Bdescr() (eg. evacuate() and
997 // isAlive() in the GC would both have to do this, at
1001 for (x = bd; x < bd + blocks; x++) {
1008 // don't forget to update the block count in g0s0.
1009 g0s0->n_blocks += blocks;
1010 // This assert can be a killer if the app is doing lots
1011 // of large block allocations.
1012 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1014 // now update the nursery to point to the new block
1015 cap->r.rCurrentNursery = bd;
1017 // we might be unlucky and have another thread get on the
1018 // run queue before us and steal the large block, but in that
1019 // case the thread will just end up requesting another large
1021 PUSH_ON_RUN_QUEUE(t);
1026 /* make all the running tasks block on a condition variable,
1027 * maybe set context_switch and wait till they all pile in,
1028 * then have them wait on a GC condition variable.
1030 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1031 (long)t->id, whatNext_strs[t->what_next]));
1034 ASSERT(!is_on_queue(t,CurrentProc));
1036 /* Currently we emit a DESCHEDULE event before GC in GUM.
1037 ToDo: either add separate event to distinguish SYSTEM time from rest
1038 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1039 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1040 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1041 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1042 emitSchedule = rtsTrue;
1046 ready_to_gc = rtsTrue;
1047 context_switch = 1; /* stop other threads ASAP */
1048 PUSH_ON_RUN_QUEUE(t);
1049 /* actual GC is done at the end of the while loop */
1055 DumpGranEvent(GR_DESCHEDULE, t));
1056 globalGranStats.tot_stackover++;
1059 // DumpGranEvent(GR_DESCHEDULE, t);
1060 globalParStats.tot_stackover++;
1062 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1063 (long)t->id, whatNext_strs[t->what_next]));
1064 /* just adjust the stack for this thread, then pop it back
1069 /* enlarge the stack */
1070 StgTSO *new_t = threadStackOverflow(t);
1072 /* This TSO has moved, so update any pointers to it from the
1073 * main thread stack. It better not be on any other queues...
1074 * (it shouldn't be).
1076 if (t->main != NULL) {
1077 t->main->tso = new_t;
1079 PUSH_ON_RUN_QUEUE(new_t);
1083 case ThreadYielding:
1084 // Reset the context switch flag. We don't do this just before
1085 // running the thread, because that would mean we would lose ticks
1086 // during GC, which can lead to unfair scheduling (a thread hogs
1087 // the CPU because the tick always arrives during GC). This way
1088 // penalises threads that do a lot of allocation, but that seems
1089 // better than the alternative.
1094 DumpGranEvent(GR_DESCHEDULE, t));
1095 globalGranStats.tot_yields++;
1098 // DumpGranEvent(GR_DESCHEDULE, t);
1099 globalParStats.tot_yields++;
1101 /* put the thread back on the run queue. Then, if we're ready to
1102 * GC, check whether this is the last task to stop. If so, wake
1103 * up the GC thread. getThread will block during a GC until the
1107 if (t->what_next != prev_what_next) {
1108 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1109 (long)t->id, whatNext_strs[t->what_next]);
1111 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1112 (long)t->id, whatNext_strs[t->what_next]);
1117 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1119 ASSERT(t->link == END_TSO_QUEUE);
1121 // Shortcut if we're just switching evaluators: don't bother
1122 // doing stack squeezing (which can be expensive), just run the
1124 if (t->what_next != prev_what_next) {
1131 ASSERT(!is_on_queue(t,CurrentProc));
1134 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1135 checkThreadQsSanity(rtsTrue));
1139 if (RtsFlags.ParFlags.doFairScheduling) {
1140 /* this does round-robin scheduling; good for concurrency */
1141 APPEND_TO_RUN_QUEUE(t);
1143 /* this does unfair scheduling; good for parallelism */
1144 PUSH_ON_RUN_QUEUE(t);
1147 // this does round-robin scheduling; good for concurrency
1148 APPEND_TO_RUN_QUEUE(t);
1152 /* add a ContinueThread event to actually process the thread */
1153 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1155 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1157 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1166 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1167 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1168 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1170 // ??? needed; should emit block before
1172 DumpGranEvent(GR_DESCHEDULE, t));
1173 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1176 ASSERT(procStatus[CurrentProc]==Busy ||
1177 ((procStatus[CurrentProc]==Fetching) &&
1178 (t->block_info.closure!=(StgClosure*)NULL)));
1179 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1180 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1181 procStatus[CurrentProc]==Fetching))
1182 procStatus[CurrentProc] = Idle;
1186 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1187 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1190 if (t->block_info.closure!=(StgClosure*)NULL)
1191 print_bq(t->block_info.closure));
1193 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1196 /* whatever we schedule next, we must log that schedule */
1197 emitSchedule = rtsTrue;
1200 /* don't need to do anything. Either the thread is blocked on
1201 * I/O, in which case we'll have called addToBlockedQueue
1202 * previously, or it's blocked on an MVar or Blackhole, in which
1203 * case it'll be on the relevant queue already.
1205 ASSERT(t->why_blocked != NotBlocked);
1207 debugBelch("--<< thread %d (%s) stopped: ",
1208 t->id, whatNext_strs[t->what_next]);
1209 printThreadBlockage(t);
1212 /* Only for dumping event to log file
1213 ToDo: do I need this in GranSim, too?
1220 case ThreadFinished:
1221 /* Need to check whether this was a main thread, and if so, signal
1222 * the task that started it with the return value. If we have no
1223 * more main threads, we probably need to stop all the tasks until
1226 /* We also end up here if the thread kills itself with an
1227 * uncaught exception, see Exception.hc.
1229 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1230 t->id, whatNext_strs[t->what_next]));
1232 endThread(t, CurrentProc); // clean-up the thread
1234 /* For now all are advisory -- HWL */
1235 //if(t->priority==AdvisoryPriority) ??
1236 advisory_thread_count--;
1239 if(t->dist.priority==RevalPriority)
1243 if (RtsFlags.ParFlags.ParStats.Full &&
1244 !RtsFlags.ParFlags.ParStats.Suppressed)
1245 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1249 // Check whether the thread that just completed was a main
1250 // thread, and if so return with the result.
1252 // There is an assumption here that all thread completion goes
1253 // through this point; we need to make sure that if a thread
1254 // ends up in the ThreadKilled state, that it stays on the run
1255 // queue so it can be dealt with here.
1258 #if defined(RTS_SUPPORTS_THREADS)
1261 mainThread->tso == t
1265 // We are a bound thread: this must be our thread that just
1267 ASSERT(mainThread->tso == t);
1269 if (t->what_next == ThreadComplete) {
1270 if (mainThread->ret) {
1271 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1272 *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1];
1274 mainThread->stat = Success;
1276 if (mainThread->ret) {
1277 *(mainThread->ret) = NULL;
1279 if (was_interrupted) {
1280 mainThread->stat = Interrupted;
1282 mainThread->stat = Killed;
1286 removeThreadLabel((StgWord)mainThread->tso->id);
1288 if (mainThread->prev == NULL) {
1289 main_threads = mainThread->link;
1291 mainThread->prev->link = mainThread->link;
1293 if (mainThread->link != NULL) {
1294 mainThread->link->prev = NULL;
1296 releaseCapability(cap);
1300 #ifdef RTS_SUPPORTS_THREADS
1301 ASSERT(t->main == NULL);
1303 if (t->main != NULL) {
1304 // Must be a main thread that is not the topmost one. Leave
1305 // it on the run queue until the stack has unwound to the
1306 // point where we can deal with this. Leaving it on the run
1307 // queue also ensures that the garbage collector knows about
1308 // this thread and its return value (it gets dropped from the
1309 // all_threads list so there's no other way to find it).
1310 APPEND_TO_RUN_QUEUE(t);
1316 barf("schedule: invalid thread return code %d", (int)ret);
1320 // When we have +RTS -i0 and we're heap profiling, do a census at
1321 // every GC. This lets us get repeatable runs for debugging.
1322 if (performHeapProfile ||
1323 (RtsFlags.ProfFlags.profileInterval==0 &&
1324 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1325 GarbageCollect(GetRoots, rtsTrue);
1327 performHeapProfile = rtsFalse;
1328 ready_to_gc = rtsFalse; // we already GC'd
1333 /* Kick any transactions which are invalid back to their atomically frames.
1334 * When next scheduled they will try to commit, this commit will fail and
1335 * they will retry. */
1336 for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1337 if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1338 if (!stmValidateTransaction (t -> trec)) {
1339 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1341 // strip the stack back to the ATOMICALLY_FRAME, aborting
1342 // the (nested) transaction, and saving the stack of any
1343 // partially-evaluated thunks on the heap.
1344 raiseAsync_(t, NULL, rtsTrue);
1347 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1353 /* everybody back, start the GC.
1354 * Could do it in this thread, or signal a condition var
1355 * to do it in another thread. Either way, we need to
1356 * broadcast on gc_pending_cond afterward.
1358 #if defined(RTS_SUPPORTS_THREADS)
1359 IF_DEBUG(scheduler,sched_belch("doing GC"));
1361 GarbageCollect(GetRoots,rtsFalse);
1362 ready_to_gc = rtsFalse;
1364 /* add a ContinueThread event to continue execution of current thread */
1365 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1367 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1369 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1377 IF_GRAN_DEBUG(unused,
1378 print_eventq(EventHd));
1380 event = get_next_event();
1383 /* ToDo: wait for next message to arrive rather than busy wait */
1386 } /* end of while(1) */
1388 IF_PAR_DEBUG(verbose,
1389 debugBelch("== Leaving schedule() after having received Finish\n"));
1392 /* ---------------------------------------------------------------------------
1393 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1394 * used by Control.Concurrent for error checking.
1395 * ------------------------------------------------------------------------- */
1398 rtsSupportsBoundThreads(void)
1407 /* ---------------------------------------------------------------------------
1408 * isThreadBound(tso): check whether tso is bound to an OS thread.
1409 * ------------------------------------------------------------------------- */
1412 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1415 return (tso->main != NULL);
1420 /* ---------------------------------------------------------------------------
1421 * Singleton fork(). Do not copy any running threads.
1422 * ------------------------------------------------------------------------- */
1424 #ifndef mingw32_TARGET_OS
1425 #define FORKPROCESS_PRIMOP_SUPPORTED
1428 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1430 deleteThreadImmediately(StgTSO *tso);
1433 forkProcess(HsStablePtr *entry
1434 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1439 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1445 IF_DEBUG(scheduler,sched_belch("forking!"));
1446 rts_lock(); // This not only acquires sched_mutex, it also
1447 // makes sure that no other threads are running
1451 if (pid) { /* parent */
1453 /* just return the pid */
1457 } else { /* child */
1460 // delete all threads
1461 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1463 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1466 // don't allow threads to catch the ThreadKilled exception
1467 deleteThreadImmediately(t);
1470 // wipe the main thread list
1471 while((m = main_threads) != NULL) {
1472 main_threads = m->link;
1473 # ifdef THREADED_RTS
1474 closeCondition(&m->bound_thread_cond);
1479 rc = rts_evalStableIO(entry, NULL); // run the action
1480 rts_checkSchedStatus("forkProcess",rc);
1484 hs_exit(); // clean up and exit
1487 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1488 barf("forkProcess#: primop not supported, sorry!\n");
1493 /* ---------------------------------------------------------------------------
1494 * deleteAllThreads(): kill all the live threads.
1496 * This is used when we catch a user interrupt (^C), before performing
1497 * any necessary cleanups and running finalizers.
1499 * Locks: sched_mutex held.
1500 * ------------------------------------------------------------------------- */
1503 deleteAllThreads ( void )
1506 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1507 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1508 next = t->global_link;
1512 // The run queue now contains a bunch of ThreadKilled threads. We
1513 // must not throw these away: the main thread(s) will be in there
1514 // somewhere, and the main scheduler loop has to deal with it.
1515 // Also, the run queue is the only thing keeping these threads from
1516 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1518 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1519 ASSERT(sleeping_queue == END_TSO_QUEUE);
1522 /* startThread and insertThread are now in GranSim.c -- HWL */
1525 /* ---------------------------------------------------------------------------
1526 * Suspending & resuming Haskell threads.
1528 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1529 * its capability before calling the C function. This allows another
1530 * task to pick up the capability and carry on running Haskell
1531 * threads. It also means that if the C call blocks, it won't lock
1534 * The Haskell thread making the C call is put to sleep for the
1535 * duration of the call, on the susepended_ccalling_threads queue. We
1536 * give out a token to the task, which it can use to resume the thread
1537 * on return from the C function.
1538 * ------------------------------------------------------------------------- */
1541 suspendThread( StgRegTable *reg )
1545 int saved_errno = errno;
1547 /* assume that *reg is a pointer to the StgRegTable part
1550 cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1552 ACQUIRE_LOCK(&sched_mutex);
1555 sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1557 // XXX this might not be necessary --SDM
1558 cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1560 threadPaused(cap->r.rCurrentTSO);
1561 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1562 suspended_ccalling_threads = cap->r.rCurrentTSO;
1564 if(cap->r.rCurrentTSO->blocked_exceptions == NULL) {
1565 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1566 cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1568 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1571 /* Use the thread ID as the token; it should be unique */
1572 tok = cap->r.rCurrentTSO->id;
1574 /* Hand back capability */
1575 releaseCapability(cap);
1577 #if defined(RTS_SUPPORTS_THREADS)
1578 /* Preparing to leave the RTS, so ensure there's a native thread/task
1579 waiting to take over.
1581 IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1584 RELEASE_LOCK(&sched_mutex);
1586 errno = saved_errno;
1591 resumeThread( StgInt tok )
1593 StgTSO *tso, **prev;
1595 int saved_errno = errno;
1597 #if defined(RTS_SUPPORTS_THREADS)
1598 /* Wait for permission to re-enter the RTS with the result. */
1599 ACQUIRE_LOCK(&sched_mutex);
1600 waitForReturnCapability(&sched_mutex, &cap);
1602 IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1604 grabCapability(&cap);
1607 /* Remove the thread off of the suspended list */
1608 prev = &suspended_ccalling_threads;
1609 for (tso = suspended_ccalling_threads;
1610 tso != END_TSO_QUEUE;
1611 prev = &tso->link, tso = tso->link) {
1612 if (tso->id == (StgThreadID)tok) {
1617 if (tso == END_TSO_QUEUE) {
1618 barf("resumeThread: thread not found");
1620 tso->link = END_TSO_QUEUE;
1622 if(tso->why_blocked == BlockedOnCCall) {
1623 awakenBlockedQueueNoLock(tso->blocked_exceptions);
1624 tso->blocked_exceptions = NULL;
1627 /* Reset blocking status */
1628 tso->why_blocked = NotBlocked;
1630 cap->r.rCurrentTSO = tso;
1631 RELEASE_LOCK(&sched_mutex);
1632 errno = saved_errno;
1637 /* ---------------------------------------------------------------------------
1639 * ------------------------------------------------------------------------ */
1640 static void unblockThread(StgTSO *tso);
1642 /* ---------------------------------------------------------------------------
1643 * Comparing Thread ids.
1645 * This is used from STG land in the implementation of the
1646 * instances of Eq/Ord for ThreadIds.
1647 * ------------------------------------------------------------------------ */
1650 cmp_thread(StgPtr tso1, StgPtr tso2)
1652 StgThreadID id1 = ((StgTSO *)tso1)->id;
1653 StgThreadID id2 = ((StgTSO *)tso2)->id;
1655 if (id1 < id2) return (-1);
1656 if (id1 > id2) return 1;
1660 /* ---------------------------------------------------------------------------
1661 * Fetching the ThreadID from an StgTSO.
1663 * This is used in the implementation of Show for ThreadIds.
1664 * ------------------------------------------------------------------------ */
1666 rts_getThreadId(StgPtr tso)
1668 return ((StgTSO *)tso)->id;
1673 labelThread(StgPtr tso, char *label)
1678 /* Caveat: Once set, you can only set the thread name to "" */
1679 len = strlen(label)+1;
1680 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1681 strncpy(buf,label,len);
1682 /* Update will free the old memory for us */
1683 updateThreadLabel(((StgTSO *)tso)->id,buf);
1687 /* ---------------------------------------------------------------------------
1688 Create a new thread.
1690 The new thread starts with the given stack size. Before the
1691 scheduler can run, however, this thread needs to have a closure
1692 (and possibly some arguments) pushed on its stack. See
1693 pushClosure() in Schedule.h.
1695 createGenThread() and createIOThread() (in SchedAPI.h) are
1696 convenient packaged versions of this function.
1698 currently pri (priority) is only used in a GRAN setup -- HWL
1699 ------------------------------------------------------------------------ */
1701 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1703 createThread(nat size, StgInt pri)
1706 createThread(nat size)
1713 /* First check whether we should create a thread at all */
1715 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1716 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1718 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1719 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1720 return END_TSO_QUEUE;
1726 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1729 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1731 /* catch ridiculously small stack sizes */
1732 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1733 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1736 stack_size = size - TSO_STRUCT_SIZEW;
1738 tso = (StgTSO *)allocate(size);
1739 TICK_ALLOC_TSO(stack_size, 0);
1741 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1743 SET_GRAN_HDR(tso, ThisPE);
1746 // Always start with the compiled code evaluator
1747 tso->what_next = ThreadRunGHC;
1749 tso->id = next_thread_id++;
1750 tso->why_blocked = NotBlocked;
1751 tso->blocked_exceptions = NULL;
1753 tso->saved_errno = 0;
1756 tso->stack_size = stack_size;
1757 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1759 tso->sp = (P_)&(tso->stack) + stack_size;
1761 tso->trec = NO_TREC;
1764 tso->prof.CCCS = CCS_MAIN;
1767 /* put a stop frame on the stack */
1768 tso->sp -= sizeofW(StgStopFrame);
1769 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1770 tso->link = END_TSO_QUEUE;
1774 /* uses more flexible routine in GranSim */
1775 insertThread(tso, CurrentProc);
1777 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1783 if (RtsFlags.GranFlags.GranSimStats.Full)
1784 DumpGranEvent(GR_START,tso);
1786 if (RtsFlags.ParFlags.ParStats.Full)
1787 DumpGranEvent(GR_STARTQ,tso);
1788 /* HACk to avoid SCHEDULE
1792 /* Link the new thread on the global thread list.
1794 tso->global_link = all_threads;
1798 tso->dist.priority = MandatoryPriority; //by default that is...
1802 tso->gran.pri = pri;
1804 tso->gran.magic = TSO_MAGIC; // debugging only
1806 tso->gran.sparkname = 0;
1807 tso->gran.startedat = CURRENT_TIME;
1808 tso->gran.exported = 0;
1809 tso->gran.basicblocks = 0;
1810 tso->gran.allocs = 0;
1811 tso->gran.exectime = 0;
1812 tso->gran.fetchtime = 0;
1813 tso->gran.fetchcount = 0;
1814 tso->gran.blocktime = 0;
1815 tso->gran.blockcount = 0;
1816 tso->gran.blockedat = 0;
1817 tso->gran.globalsparks = 0;
1818 tso->gran.localsparks = 0;
1819 if (RtsFlags.GranFlags.Light)
1820 tso->gran.clock = Now; /* local clock */
1822 tso->gran.clock = 0;
1824 IF_DEBUG(gran,printTSO(tso));
1827 tso->par.magic = TSO_MAGIC; // debugging only
1829 tso->par.sparkname = 0;
1830 tso->par.startedat = CURRENT_TIME;
1831 tso->par.exported = 0;
1832 tso->par.basicblocks = 0;
1833 tso->par.allocs = 0;
1834 tso->par.exectime = 0;
1835 tso->par.fetchtime = 0;
1836 tso->par.fetchcount = 0;
1837 tso->par.blocktime = 0;
1838 tso->par.blockcount = 0;
1839 tso->par.blockedat = 0;
1840 tso->par.globalsparks = 0;
1841 tso->par.localsparks = 0;
1845 globalGranStats.tot_threads_created++;
1846 globalGranStats.threads_created_on_PE[CurrentProc]++;
1847 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1848 globalGranStats.tot_sq_probes++;
1850 // collect parallel global statistics (currently done together with GC stats)
1851 if (RtsFlags.ParFlags.ParStats.Global &&
1852 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1853 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
1854 globalParStats.tot_threads_created++;
1860 sched_belch("==__ schedule: Created TSO %d (%p);",
1861 CurrentProc, tso, tso->id));
1863 IF_PAR_DEBUG(verbose,
1864 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1865 (long)tso->id, tso, advisory_thread_count));
1867 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1868 (long)tso->id, (long)tso->stack_size));
1875 all parallel thread creation calls should fall through the following routine.
1878 createSparkThread(rtsSpark spark)
1880 ASSERT(spark != (rtsSpark)NULL);
1881 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1883 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1884 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1885 return END_TSO_QUEUE;
1889 tso = createThread(RtsFlags.GcFlags.initialStkSize);
1890 if (tso==END_TSO_QUEUE)
1891 barf("createSparkThread: Cannot create TSO");
1893 tso->priority = AdvisoryPriority;
1895 pushClosure(tso,spark);
1896 PUSH_ON_RUN_QUEUE(tso);
1897 advisory_thread_count++;
1904 Turn a spark into a thread.
1905 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1909 activateSpark (rtsSpark spark)
1913 tso = createSparkThread(spark);
1914 if (RtsFlags.ParFlags.ParStats.Full) {
1915 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1916 IF_PAR_DEBUG(verbose,
1917 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1918 (StgClosure *)spark, info_type((StgClosure *)spark)));
1920 // ToDo: fwd info on local/global spark to thread -- HWL
1921 // tso->gran.exported = spark->exported;
1922 // tso->gran.locked = !spark->global;
1923 // tso->gran.sparkname = spark->name;
1929 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1930 Capability *initialCapability
1934 /* ---------------------------------------------------------------------------
1937 * scheduleThread puts a thread on the head of the runnable queue.
1938 * This will usually be done immediately after a thread is created.
1939 * The caller of scheduleThread must create the thread using e.g.
1940 * createThread and push an appropriate closure
1941 * on this thread's stack before the scheduler is invoked.
1942 * ------------------------------------------------------------------------ */
1944 static void scheduleThread_ (StgTSO* tso);
1947 scheduleThread_(StgTSO *tso)
1949 // The thread goes at the *end* of the run-queue, to avoid possible
1950 // starvation of any threads already on the queue.
1951 APPEND_TO_RUN_QUEUE(tso);
1956 scheduleThread(StgTSO* tso)
1958 ACQUIRE_LOCK(&sched_mutex);
1959 scheduleThread_(tso);
1960 RELEASE_LOCK(&sched_mutex);
1963 #if defined(RTS_SUPPORTS_THREADS)
1964 static Condition bound_cond_cache;
1965 static int bound_cond_cache_full = 0;
1970 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1971 Capability *initialCapability)
1973 // Precondition: sched_mutex must be held
1976 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1981 m->link = main_threads;
1983 if (main_threads != NULL) {
1984 main_threads->prev = m;
1988 #if defined(RTS_SUPPORTS_THREADS)
1989 // Allocating a new condition for each thread is expensive, so we
1990 // cache one. This is a pretty feeble hack, but it helps speed up
1991 // consecutive call-ins quite a bit.
1992 if (bound_cond_cache_full) {
1993 m->bound_thread_cond = bound_cond_cache;
1994 bound_cond_cache_full = 0;
1996 initCondition(&m->bound_thread_cond);
2000 /* Put the thread on the main-threads list prior to scheduling the TSO.
2001 Failure to do so introduces a race condition in the MT case (as
2002 identified by Wolfgang Thaller), whereby the new task/OS thread
2003 created by scheduleThread_() would complete prior to the thread
2004 that spawned it managed to put 'itself' on the main-threads list.
2005 The upshot of it all being that the worker thread wouldn't get to
2006 signal the completion of the its work item for the main thread to
2007 see (==> it got stuck waiting.) -- sof 6/02.
2009 IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2011 APPEND_TO_RUN_QUEUE(tso);
2012 // NB. Don't call threadRunnable() here, because the thread is
2013 // bound and only runnable by *this* OS thread, so waking up other
2014 // workers will just slow things down.
2016 return waitThread_(m, initialCapability);
2019 /* ---------------------------------------------------------------------------
2022 * Initialise the scheduler. This resets all the queues - if the
2023 * queues contained any threads, they'll be garbage collected at the
2026 * ------------------------------------------------------------------------ */
2034 for (i=0; i<=MAX_PROC; i++) {
2035 run_queue_hds[i] = END_TSO_QUEUE;
2036 run_queue_tls[i] = END_TSO_QUEUE;
2037 blocked_queue_hds[i] = END_TSO_QUEUE;
2038 blocked_queue_tls[i] = END_TSO_QUEUE;
2039 ccalling_threadss[i] = END_TSO_QUEUE;
2040 sleeping_queue = END_TSO_QUEUE;
2043 run_queue_hd = END_TSO_QUEUE;
2044 run_queue_tl = END_TSO_QUEUE;
2045 blocked_queue_hd = END_TSO_QUEUE;
2046 blocked_queue_tl = END_TSO_QUEUE;
2047 sleeping_queue = END_TSO_QUEUE;
2050 suspended_ccalling_threads = END_TSO_QUEUE;
2052 main_threads = NULL;
2053 all_threads = END_TSO_QUEUE;
2058 RtsFlags.ConcFlags.ctxtSwitchTicks =
2059 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2061 #if defined(RTS_SUPPORTS_THREADS)
2062 /* Initialise the mutex and condition variables used by
2064 initMutex(&sched_mutex);
2065 initMutex(&term_mutex);
2068 ACQUIRE_LOCK(&sched_mutex);
2070 /* A capability holds the state a native thread needs in
2071 * order to execute STG code. At least one capability is
2072 * floating around (only SMP builds have more than one).
2076 #if defined(RTS_SUPPORTS_THREADS)
2077 /* start our haskell execution tasks */
2078 startTaskManager(0,taskStart);
2081 #if /* defined(SMP) ||*/ defined(PAR)
2085 RELEASE_LOCK(&sched_mutex);
2089 exitScheduler( void )
2091 #if defined(RTS_SUPPORTS_THREADS)
2094 shutting_down_scheduler = rtsTrue;
2097 /* ----------------------------------------------------------------------------
2098 Managing the per-task allocation areas.
2100 Each capability comes with an allocation area. These are
2101 fixed-length block lists into which allocation can be done.
2103 ToDo: no support for two-space collection at the moment???
2104 ------------------------------------------------------------------------- */
2108 waitThread_(StgMainThread* m, Capability *initialCapability)
2110 SchedulerStatus stat;
2112 // Precondition: sched_mutex must be held.
2113 IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2116 /* GranSim specific init */
2117 CurrentTSO = m->tso; // the TSO to run
2118 procStatus[MainProc] = Busy; // status of main PE
2119 CurrentProc = MainProc; // PE to run it on
2120 schedule(m,initialCapability);
2122 schedule(m,initialCapability);
2123 ASSERT(m->stat != NoStatus);
2128 #if defined(RTS_SUPPORTS_THREADS)
2129 // Free the condition variable, returning it to the cache if possible.
2130 if (!bound_cond_cache_full) {
2131 bound_cond_cache = m->bound_thread_cond;
2132 bound_cond_cache_full = 1;
2134 closeCondition(&m->bound_thread_cond);
2138 IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2141 // Postcondition: sched_mutex still held
2145 /* ---------------------------------------------------------------------------
2146 Where are the roots that we know about?
2148 - all the threads on the runnable queue
2149 - all the threads on the blocked queue
2150 - all the threads on the sleeping queue
2151 - all the thread currently executing a _ccall_GC
2152 - all the "main threads"
2154 ------------------------------------------------------------------------ */
2156 /* This has to be protected either by the scheduler monitor, or by the
2157 garbage collection monitor (probably the latter).
2162 GetRoots( evac_fn evac )
2167 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2168 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2169 evac((StgClosure **)&run_queue_hds[i]);
2170 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2171 evac((StgClosure **)&run_queue_tls[i]);
2173 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2174 evac((StgClosure **)&blocked_queue_hds[i]);
2175 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2176 evac((StgClosure **)&blocked_queue_tls[i]);
2177 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2178 evac((StgClosure **)&ccalling_threads[i]);
2185 if (run_queue_hd != END_TSO_QUEUE) {
2186 ASSERT(run_queue_tl != END_TSO_QUEUE);
2187 evac((StgClosure **)&run_queue_hd);
2188 evac((StgClosure **)&run_queue_tl);
2191 if (blocked_queue_hd != END_TSO_QUEUE) {
2192 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2193 evac((StgClosure **)&blocked_queue_hd);
2194 evac((StgClosure **)&blocked_queue_tl);
2197 if (sleeping_queue != END_TSO_QUEUE) {
2198 evac((StgClosure **)&sleeping_queue);
2202 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2203 evac((StgClosure **)&suspended_ccalling_threads);
2206 #if defined(PAR) || defined(GRAN)
2207 markSparkQueue(evac);
2210 #if defined(RTS_USER_SIGNALS)
2211 // mark the signal handlers (signals should be already blocked)
2212 markSignalHandlers(evac);
2216 /* -----------------------------------------------------------------------------
2219 This is the interface to the garbage collector from Haskell land.
2220 We provide this so that external C code can allocate and garbage
2221 collect when called from Haskell via _ccall_GC.
2223 It might be useful to provide an interface whereby the programmer
2224 can specify more roots (ToDo).
2226 This needs to be protected by the GC condition variable above. KH.
2227 -------------------------------------------------------------------------- */
2229 static void (*extra_roots)(evac_fn);
2234 /* Obligated to hold this lock upon entry */
2235 ACQUIRE_LOCK(&sched_mutex);
2236 GarbageCollect(GetRoots,rtsFalse);
2237 RELEASE_LOCK(&sched_mutex);
2241 performMajorGC(void)
2243 ACQUIRE_LOCK(&sched_mutex);
2244 GarbageCollect(GetRoots,rtsTrue);
2245 RELEASE_LOCK(&sched_mutex);
2249 AllRoots(evac_fn evac)
2251 GetRoots(evac); // the scheduler's roots
2252 extra_roots(evac); // the user's roots
2256 performGCWithRoots(void (*get_roots)(evac_fn))
2258 ACQUIRE_LOCK(&sched_mutex);
2259 extra_roots = get_roots;
2260 GarbageCollect(AllRoots,rtsFalse);
2261 RELEASE_LOCK(&sched_mutex);
2264 /* -----------------------------------------------------------------------------
2267 If the thread has reached its maximum stack size, then raise the
2268 StackOverflow exception in the offending thread. Otherwise
2269 relocate the TSO into a larger chunk of memory and adjust its stack
2271 -------------------------------------------------------------------------- */
2274 threadStackOverflow(StgTSO *tso)
2276 nat new_stack_size, new_tso_size, stack_words;
2280 IF_DEBUG(sanity,checkTSO(tso));
2281 if (tso->stack_size >= tso->max_stack_size) {
2284 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2285 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2286 /* If we're debugging, just print out the top of the stack */
2287 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2290 /* Send this thread the StackOverflow exception */
2291 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2295 /* Try to double the current stack size. If that takes us over the
2296 * maximum stack size for this thread, then use the maximum instead.
2297 * Finally round up so the TSO ends up as a whole number of blocks.
2299 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2300 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2301 TSO_STRUCT_SIZE)/sizeof(W_);
2302 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2303 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2305 IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2307 dest = (StgTSO *)allocate(new_tso_size);
2308 TICK_ALLOC_TSO(new_stack_size,0);
2310 /* copy the TSO block and the old stack into the new area */
2311 memcpy(dest,tso,TSO_STRUCT_SIZE);
2312 stack_words = tso->stack + tso->stack_size - tso->sp;
2313 new_sp = (P_)dest + new_tso_size - stack_words;
2314 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2316 /* relocate the stack pointers... */
2318 dest->stack_size = new_stack_size;
2320 /* Mark the old TSO as relocated. We have to check for relocated
2321 * TSOs in the garbage collector and any primops that deal with TSOs.
2323 * It's important to set the sp value to just beyond the end
2324 * of the stack, so we don't attempt to scavenge any part of the
2327 tso->what_next = ThreadRelocated;
2329 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2330 tso->why_blocked = NotBlocked;
2331 dest->mut_link = NULL;
2333 IF_PAR_DEBUG(verbose,
2334 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2335 tso->id, tso, tso->stack_size);
2336 /* If we're debugging, just print out the top of the stack */
2337 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2340 IF_DEBUG(sanity,checkTSO(tso));
2342 IF_DEBUG(scheduler,printTSO(dest));
2348 /* ---------------------------------------------------------------------------
2349 Wake up a queue that was blocked on some resource.
2350 ------------------------------------------------------------------------ */
2354 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2359 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2361 /* write RESUME events to log file and
2362 update blocked and fetch time (depending on type of the orig closure) */
2363 if (RtsFlags.ParFlags.ParStats.Full) {
2364 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2365 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2366 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2367 if (EMPTY_RUN_QUEUE())
2368 emitSchedule = rtsTrue;
2370 switch (get_itbl(node)->type) {
2372 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2377 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2384 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2391 static StgBlockingQueueElement *
2392 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2395 PEs node_loc, tso_loc;
2397 node_loc = where_is(node); // should be lifted out of loop
2398 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2399 tso_loc = where_is((StgClosure *)tso);
2400 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2401 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2402 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2403 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2404 // insertThread(tso, node_loc);
2405 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2407 tso, node, (rtsSpark*)NULL);
2408 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2411 } else { // TSO is remote (actually should be FMBQ)
2412 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2413 RtsFlags.GranFlags.Costs.gunblocktime +
2414 RtsFlags.GranFlags.Costs.latency;
2415 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2417 tso, node, (rtsSpark*)NULL);
2418 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2421 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2423 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2424 (node_loc==tso_loc ? "Local" : "Global"),
2425 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2426 tso->block_info.closure = NULL;
2427 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
2431 static StgBlockingQueueElement *
2432 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2434 StgBlockingQueueElement *next;
2436 switch (get_itbl(bqe)->type) {
2438 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2439 /* if it's a TSO just push it onto the run_queue */
2441 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2442 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
2444 unblockCount(bqe, node);
2445 /* reset blocking status after dumping event */
2446 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2450 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2452 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2453 PendingFetches = (StgBlockedFetch *)bqe;
2457 /* can ignore this case in a non-debugging setup;
2458 see comments on RBHSave closures above */
2460 /* check that the closure is an RBHSave closure */
2461 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2462 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2463 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2467 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2468 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2472 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2476 #else /* !GRAN && !PAR */
2478 unblockOneLocked(StgTSO *tso)
2482 ASSERT(get_itbl(tso)->type == TSO);
2483 ASSERT(tso->why_blocked != NotBlocked);
2484 tso->why_blocked = NotBlocked;
2486 tso->link = END_TSO_QUEUE;
2487 APPEND_TO_RUN_QUEUE(tso);
2489 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2494 #if defined(GRAN) || defined(PAR)
2495 INLINE_ME StgBlockingQueueElement *
2496 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2498 ACQUIRE_LOCK(&sched_mutex);
2499 bqe = unblockOneLocked(bqe, node);
2500 RELEASE_LOCK(&sched_mutex);
2505 unblockOne(StgTSO *tso)
2507 ACQUIRE_LOCK(&sched_mutex);
2508 tso = unblockOneLocked(tso);
2509 RELEASE_LOCK(&sched_mutex);
2516 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2518 StgBlockingQueueElement *bqe;
2523 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2524 node, CurrentProc, CurrentTime[CurrentProc],
2525 CurrentTSO->id, CurrentTSO));
2527 node_loc = where_is(node);
2529 ASSERT(q == END_BQ_QUEUE ||
2530 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2531 get_itbl(q)->type == CONSTR); // closure (type constructor)
2532 ASSERT(is_unique(node));
2534 /* FAKE FETCH: magically copy the node to the tso's proc;
2535 no Fetch necessary because in reality the node should not have been
2536 moved to the other PE in the first place
2538 if (CurrentProc!=node_loc) {
2540 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2541 node, node_loc, CurrentProc, CurrentTSO->id,
2542 // CurrentTSO, where_is(CurrentTSO),
2543 node->header.gran.procs));
2544 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2546 debugBelch("## new bitmask of node %p is %#x\n",
2547 node, node->header.gran.procs));
2548 if (RtsFlags.GranFlags.GranSimStats.Global) {
2549 globalGranStats.tot_fake_fetches++;
2554 // ToDo: check: ASSERT(CurrentProc==node_loc);
2555 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2558 bqe points to the current element in the queue
2559 next points to the next element in the queue
2561 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2562 //tso_loc = where_is(tso);
2564 bqe = unblockOneLocked(bqe, node);
2567 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2568 the closure to make room for the anchor of the BQ */
2569 if (bqe!=END_BQ_QUEUE) {
2570 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2572 ASSERT((info_ptr==&RBH_Save_0_info) ||
2573 (info_ptr==&RBH_Save_1_info) ||
2574 (info_ptr==&RBH_Save_2_info));
2576 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2577 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2578 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2581 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2582 node, info_type(node)));
2585 /* statistics gathering */
2586 if (RtsFlags.GranFlags.GranSimStats.Global) {
2587 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2588 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2589 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2590 globalGranStats.tot_awbq++; // total no. of bqs awakened
2593 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2594 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2598 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2600 StgBlockingQueueElement *bqe;
2602 ACQUIRE_LOCK(&sched_mutex);
2604 IF_PAR_DEBUG(verbose,
2605 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2609 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2610 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2615 ASSERT(q == END_BQ_QUEUE ||
2616 get_itbl(q)->type == TSO ||
2617 get_itbl(q)->type == BLOCKED_FETCH ||
2618 get_itbl(q)->type == CONSTR);
2621 while (get_itbl(bqe)->type==TSO ||
2622 get_itbl(bqe)->type==BLOCKED_FETCH) {
2623 bqe = unblockOneLocked(bqe, node);
2625 RELEASE_LOCK(&sched_mutex);
2628 #else /* !GRAN && !PAR */
2631 awakenBlockedQueueNoLock(StgTSO *tso)
2633 while (tso != END_TSO_QUEUE) {
2634 tso = unblockOneLocked(tso);
2639 awakenBlockedQueue(StgTSO *tso)
2641 ACQUIRE_LOCK(&sched_mutex);
2642 while (tso != END_TSO_QUEUE) {
2643 tso = unblockOneLocked(tso);
2645 RELEASE_LOCK(&sched_mutex);
2649 /* ---------------------------------------------------------------------------
2651 - usually called inside a signal handler so it mustn't do anything fancy.
2652 ------------------------------------------------------------------------ */
2655 interruptStgRts(void)
2661 /* -----------------------------------------------------------------------------
2664 This is for use when we raise an exception in another thread, which
2666 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2667 -------------------------------------------------------------------------- */
2669 #if defined(GRAN) || defined(PAR)
2671 NB: only the type of the blocking queue is different in GranSim and GUM
2672 the operations on the queue-elements are the same
2673 long live polymorphism!
2675 Locks: sched_mutex is held upon entry and exit.
2679 unblockThread(StgTSO *tso)
2681 StgBlockingQueueElement *t, **last;
2683 switch (tso->why_blocked) {
2686 return; /* not blocked */
2689 // Be careful: nothing to do here! We tell the scheduler that the thread
2690 // is runnable and we leave it to the stack-walking code to abort the
2691 // transaction while unwinding the stack. We should perhaps have a debugging
2692 // test to make sure that this really happens and that the 'zombie' transaction
2693 // does not get committed.
2697 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2699 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2700 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2702 last = (StgBlockingQueueElement **)&mvar->head;
2703 for (t = (StgBlockingQueueElement *)mvar->head;
2705 last = &t->link, last_tso = t, t = t->link) {
2706 if (t == (StgBlockingQueueElement *)tso) {
2707 *last = (StgBlockingQueueElement *)tso->link;
2708 if (mvar->tail == tso) {
2709 mvar->tail = (StgTSO *)last_tso;
2714 barf("unblockThread (MVAR): TSO not found");
2717 case BlockedOnBlackHole:
2718 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2720 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2722 last = &bq->blocking_queue;
2723 for (t = bq->blocking_queue;
2725 last = &t->link, t = t->link) {
2726 if (t == (StgBlockingQueueElement *)tso) {
2727 *last = (StgBlockingQueueElement *)tso->link;
2731 barf("unblockThread (BLACKHOLE): TSO not found");
2734 case BlockedOnException:
2736 StgTSO *target = tso->block_info.tso;
2738 ASSERT(get_itbl(target)->type == TSO);
2740 if (target->what_next == ThreadRelocated) {
2741 target = target->link;
2742 ASSERT(get_itbl(target)->type == TSO);
2745 ASSERT(target->blocked_exceptions != NULL);
2747 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2748 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2750 last = &t->link, t = t->link) {
2751 ASSERT(get_itbl(t)->type == TSO);
2752 if (t == (StgBlockingQueueElement *)tso) {
2753 *last = (StgBlockingQueueElement *)tso->link;
2757 barf("unblockThread (Exception): TSO not found");
2761 case BlockedOnWrite:
2762 #if defined(mingw32_TARGET_OS)
2763 case BlockedOnDoProc:
2766 /* take TSO off blocked_queue */
2767 StgBlockingQueueElement *prev = NULL;
2768 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2769 prev = t, t = t->link) {
2770 if (t == (StgBlockingQueueElement *)tso) {
2772 blocked_queue_hd = (StgTSO *)t->link;
2773 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2774 blocked_queue_tl = END_TSO_QUEUE;
2777 prev->link = t->link;
2778 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2779 blocked_queue_tl = (StgTSO *)prev;
2785 barf("unblockThread (I/O): TSO not found");
2788 case BlockedOnDelay:
2790 /* take TSO off sleeping_queue */
2791 StgBlockingQueueElement *prev = NULL;
2792 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2793 prev = t, t = t->link) {
2794 if (t == (StgBlockingQueueElement *)tso) {
2796 sleeping_queue = (StgTSO *)t->link;
2798 prev->link = t->link;
2803 barf("unblockThread (delay): TSO not found");
2807 barf("unblockThread");
2811 tso->link = END_TSO_QUEUE;
2812 tso->why_blocked = NotBlocked;
2813 tso->block_info.closure = NULL;
2814 PUSH_ON_RUN_QUEUE(tso);
2818 unblockThread(StgTSO *tso)
2822 /* To avoid locking unnecessarily. */
2823 if (tso->why_blocked == NotBlocked) {
2827 switch (tso->why_blocked) {
2830 // Be careful: nothing to do here! We tell the scheduler that the thread
2831 // is runnable and we leave it to the stack-walking code to abort the
2832 // transaction while unwinding the stack. We should perhaps have a debugging
2833 // test to make sure that this really happens and that the 'zombie' transaction
2834 // does not get committed.
2838 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2840 StgTSO *last_tso = END_TSO_QUEUE;
2841 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2844 for (t = mvar->head; t != END_TSO_QUEUE;
2845 last = &t->link, last_tso = t, t = t->link) {
2848 if (mvar->tail == tso) {
2849 mvar->tail = last_tso;
2854 barf("unblockThread (MVAR): TSO not found");
2857 case BlockedOnBlackHole:
2858 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2860 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2862 last = &bq->blocking_queue;
2863 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2864 last = &t->link, t = t->link) {
2870 barf("unblockThread (BLACKHOLE): TSO not found");
2873 case BlockedOnException:
2875 StgTSO *target = tso->block_info.tso;
2877 ASSERT(get_itbl(target)->type == TSO);
2879 while (target->what_next == ThreadRelocated) {
2880 target = target->link;
2881 ASSERT(get_itbl(target)->type == TSO);
2884 ASSERT(target->blocked_exceptions != NULL);
2886 last = &target->blocked_exceptions;
2887 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2888 last = &t->link, t = t->link) {
2889 ASSERT(get_itbl(t)->type == TSO);
2895 barf("unblockThread (Exception): TSO not found");
2899 case BlockedOnWrite:
2900 #if defined(mingw32_TARGET_OS)
2901 case BlockedOnDoProc:
2904 StgTSO *prev = NULL;
2905 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2906 prev = t, t = t->link) {
2909 blocked_queue_hd = t->link;
2910 if (blocked_queue_tl == t) {
2911 blocked_queue_tl = END_TSO_QUEUE;
2914 prev->link = t->link;
2915 if (blocked_queue_tl == t) {
2916 blocked_queue_tl = prev;
2922 barf("unblockThread (I/O): TSO not found");
2925 case BlockedOnDelay:
2927 StgTSO *prev = NULL;
2928 for (t = sleeping_queue; t != END_TSO_QUEUE;
2929 prev = t, t = t->link) {
2932 sleeping_queue = t->link;
2934 prev->link = t->link;
2939 barf("unblockThread (delay): TSO not found");
2943 barf("unblockThread");
2947 tso->link = END_TSO_QUEUE;
2948 tso->why_blocked = NotBlocked;
2949 tso->block_info.closure = NULL;
2950 APPEND_TO_RUN_QUEUE(tso);
2954 /* -----------------------------------------------------------------------------
2957 * The following function implements the magic for raising an
2958 * asynchronous exception in an existing thread.
2960 * We first remove the thread from any queue on which it might be
2961 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2963 * We strip the stack down to the innermost CATCH_FRAME, building
2964 * thunks in the heap for all the active computations, so they can
2965 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2966 * an application of the handler to the exception, and push it on
2967 * the top of the stack.
2969 * How exactly do we save all the active computations? We create an
2970 * AP_STACK for every UpdateFrame on the stack. Entering one of these
2971 * AP_STACKs pushes everything from the corresponding update frame
2972 * upwards onto the stack. (Actually, it pushes everything up to the
2973 * next update frame plus a pointer to the next AP_STACK object.
2974 * Entering the next AP_STACK object pushes more onto the stack until we
2975 * reach the last AP_STACK object - at which point the stack should look
2976 * exactly as it did when we killed the TSO and we can continue
2977 * execution by entering the closure on top of the stack.
2979 * We can also kill a thread entirely - this happens if either (a) the
2980 * exception passed to raiseAsync is NULL, or (b) there's no
2981 * CATCH_FRAME on the stack. In either case, we strip the entire
2982 * stack and replace the thread with a zombie.
2984 * Locks: sched_mutex held upon entry nor exit.
2986 * -------------------------------------------------------------------------- */
2989 deleteThread(StgTSO *tso)
2991 if (tso->why_blocked != BlockedOnCCall &&
2992 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2993 raiseAsync(tso,NULL);
2997 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2999 deleteThreadImmediately(StgTSO *tso)
3000 { // for forkProcess only:
3001 // delete thread without giving it a chance to catch the KillThread exception
3003 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3007 if (tso->why_blocked != BlockedOnCCall &&
3008 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3012 tso->what_next = ThreadKilled;
3017 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3019 /* When raising async exs from contexts where sched_mutex isn't held;
3020 use raiseAsyncWithLock(). */
3021 ACQUIRE_LOCK(&sched_mutex);
3022 raiseAsync(tso,exception);
3023 RELEASE_LOCK(&sched_mutex);
3027 raiseAsync(StgTSO *tso, StgClosure *exception)
3029 raiseAsync_(tso, exception, rtsFalse);
3033 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3035 StgRetInfoTable *info;
3038 // Thread already dead?
3039 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3044 sched_belch("raising exception in thread %ld.", (long)tso->id));
3046 // Remove it from any blocking queues
3051 // The stack freezing code assumes there's a closure pointer on
3052 // the top of the stack, so we have to arrange that this is the case...
3054 if (sp[0] == (W_)&stg_enter_info) {
3058 sp[0] = (W_)&stg_dummy_ret_closure;
3064 // 1. Let the top of the stack be the "current closure"
3066 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3069 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3070 // current closure applied to the chunk of stack up to (but not
3071 // including) the update frame. This closure becomes the "current
3072 // closure". Go back to step 2.
3074 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3075 // top of the stack applied to the exception.
3077 // 5. If it's a STOP_FRAME, then kill the thread.
3079 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
3086 info = get_ret_itbl((StgClosure *)frame);
3088 while (info->i.type != UPDATE_FRAME
3089 && (info->i.type != CATCH_FRAME || exception == NULL)
3090 && info->i.type != STOP_FRAME
3091 && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3093 if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3094 // IF we find an ATOMICALLY_FRAME then we abort the
3095 // current transaction and propagate the exception. In
3096 // this case (unlike ordinary exceptions) we do not care
3097 // whether the transaction is valid or not because its
3098 // possible validity cannot have caused the exception
3099 // and will not be visible after the abort.
3101 debugBelch("Found atomically block delivering async exception\n"));
3102 stmAbortTransaction(tso -> trec);
3103 tso -> trec = stmGetEnclosingTRec(tso -> trec);
3105 frame += stack_frame_sizeW((StgClosure *)frame);
3106 info = get_ret_itbl((StgClosure *)frame);
3109 switch (info->i.type) {
3111 case ATOMICALLY_FRAME:
3112 ASSERT(stop_at_atomically);
3113 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3114 stmCondemnTransaction(tso -> trec);
3118 // R1 is not a register: the return convention for IO in
3119 // this case puts the return value on the stack, so we
3120 // need to set up the stack to return to the atomically
3121 // frame properly...
3122 tso->sp = frame - 2;
3123 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3124 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3126 tso->what_next = ThreadRunGHC;
3130 // If we find a CATCH_FRAME, and we've got an exception to raise,
3131 // then build the THUNK raise(exception), and leave it on
3132 // top of the CATCH_FRAME ready to enter.
3136 StgCatchFrame *cf = (StgCatchFrame *)frame;
3140 // we've got an exception to raise, so let's pass it to the
3141 // handler in this frame.
3143 raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3144 TICK_ALLOC_SE_THK(1,0);
3145 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3146 raise->payload[0] = exception;
3148 // throw away the stack from Sp up to the CATCH_FRAME.
3152 /* Ensure that async excpetions are blocked now, so we don't get
3153 * a surprise exception before we get around to executing the
3156 if (tso->blocked_exceptions == NULL) {
3157 tso->blocked_exceptions = END_TSO_QUEUE;
3160 /* Put the newly-built THUNK on top of the stack, ready to execute
3161 * when the thread restarts.
3164 sp[-1] = (W_)&stg_enter_info;
3166 tso->what_next = ThreadRunGHC;
3167 IF_DEBUG(sanity, checkTSO(tso));
3176 // First build an AP_STACK consisting of the stack chunk above the
3177 // current update frame, with the top word on the stack as the
3180 words = frame - sp - 1;
3181 ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3184 ap->fun = (StgClosure *)sp[0];
3186 for(i=0; i < (nat)words; ++i) {
3187 ap->payload[i] = (StgClosure *)*sp++;
3190 SET_HDR(ap,&stg_AP_STACK_info,
3191 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3192 TICK_ALLOC_UP_THK(words+1,0);
3195 debugBelch("sched: Updating ");
3196 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3197 debugBelch(" with ");
3198 printObj((StgClosure *)ap);
3201 // Replace the updatee with an indirection - happily
3202 // this will also wake up any threads currently
3203 // waiting on the result.
3205 // Warning: if we're in a loop, more than one update frame on
3206 // the stack may point to the same object. Be careful not to
3207 // overwrite an IND_OLDGEN in this case, because we'll screw
3208 // up the mutable lists. To be on the safe side, don't
3209 // overwrite any kind of indirection at all. See also
3210 // threadSqueezeStack in GC.c, where we have to make a similar
3213 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3214 // revert the black hole
3215 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3218 sp += sizeofW(StgUpdateFrame) - 1;
3219 sp[0] = (W_)ap; // push onto stack
3224 // We've stripped the entire stack, the thread is now dead.
3225 sp += sizeofW(StgStopFrame);
3226 tso->what_next = ThreadKilled;
3237 /* -----------------------------------------------------------------------------
3238 raiseExceptionHelper
3240 This function is called by the raise# primitve, just so that we can
3241 move some of the tricky bits of raising an exception from C-- into
3242 C. Who knows, it might be a useful re-useable thing here too.
3243 -------------------------------------------------------------------------- */
3246 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3248 StgClosure *raise_closure = NULL;
3250 StgRetInfoTable *info;
3252 // This closure represents the expression 'raise# E' where E
3253 // is the exception raise. It is used to overwrite all the
3254 // thunks which are currently under evaluataion.
3258 // LDV profiling: stg_raise_info has THUNK as its closure
3259 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3260 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3261 // 1 does not cause any problem unless profiling is performed.
3262 // However, when LDV profiling goes on, we need to linearly scan
3263 // small object pool, where raise_closure is stored, so we should
3264 // use MIN_UPD_SIZE.
3266 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3267 // sizeofW(StgClosure)+1);
3271 // Walk up the stack, looking for the catch frame. On the way,
3272 // we update any closures pointed to from update frames with the
3273 // raise closure that we just built.
3277 info = get_ret_itbl((StgClosure *)p);
3278 next = p + stack_frame_sizeW((StgClosure *)p);
3279 switch (info->i.type) {
3282 // Only create raise_closure if we need to.
3283 if (raise_closure == NULL) {
3285 (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3286 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3287 raise_closure->payload[0] = exception;
3289 UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3293 case ATOMICALLY_FRAME:
3294 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3296 return ATOMICALLY_FRAME;
3302 case CATCH_STM_FRAME:
3303 IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3305 return CATCH_STM_FRAME;
3311 case CATCH_RETRY_FRAME:
3320 /* -----------------------------------------------------------------------------
3321 findRetryFrameHelper
3323 This function is called by the retry# primitive. It traverses the stack
3324 leaving tso->sp referring to the frame which should handle the retry.
3326 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3327 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3329 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3330 despite the similar implementation.
3332 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3333 not be created within memory transactions.
3334 -------------------------------------------------------------------------- */
3337 findRetryFrameHelper (StgTSO *tso)
3340 StgRetInfoTable *info;
3344 info = get_ret_itbl((StgClosure *)p);
3345 next = p + stack_frame_sizeW((StgClosure *)p);
3346 switch (info->i.type) {
3348 case ATOMICALLY_FRAME:
3349 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3351 return ATOMICALLY_FRAME;
3353 case CATCH_RETRY_FRAME:
3354 IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3356 return CATCH_RETRY_FRAME;
3358 case CATCH_STM_FRAME:
3360 ASSERT(info->i.type != CATCH_FRAME);
3361 ASSERT(info->i.type != STOP_FRAME);
3368 /* -----------------------------------------------------------------------------
3369 resurrectThreads is called after garbage collection on the list of
3370 threads found to be garbage. Each of these threads will be woken
3371 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3372 on an MVar, or NonTermination if the thread was blocked on a Black
3375 Locks: sched_mutex isn't held upon entry nor exit.
3376 -------------------------------------------------------------------------- */
3379 resurrectThreads( StgTSO *threads )
3383 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3384 next = tso->global_link;
3385 tso->global_link = all_threads;
3387 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3389 switch (tso->why_blocked) {
3391 case BlockedOnException:
3392 /* Called by GC - sched_mutex lock is currently held. */
3393 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3395 case BlockedOnBlackHole:
3396 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3399 raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3402 /* This might happen if the thread was blocked on a black hole
3403 * belonging to a thread that we've just woken up (raiseAsync
3404 * can wake up threads, remember...).
3408 barf("resurrectThreads: thread blocked in a strange way");
3413 /* ----------------------------------------------------------------------------
3414 * Debugging: why is a thread blocked
3415 * [Also provides useful information when debugging threaded programs
3416 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3417 ------------------------------------------------------------------------- */
3421 printThreadBlockage(StgTSO *tso)
3423 switch (tso->why_blocked) {
3425 debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3427 case BlockedOnWrite:
3428 debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3430 #if defined(mingw32_TARGET_OS)
3431 case BlockedOnDoProc:
3432 debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3435 case BlockedOnDelay:
3436 debugBelch("is blocked until %d", tso->block_info.target);
3439 debugBelch("is blocked on an MVar");
3441 case BlockedOnException:
3442 debugBelch("is blocked on delivering an exception to thread %d",
3443 tso->block_info.tso->id);
3445 case BlockedOnBlackHole:
3446 debugBelch("is blocked on a black hole");
3449 debugBelch("is not blocked");
3453 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3454 tso->block_info.closure, info_type(tso->block_info.closure));
3456 case BlockedOnGA_NoSend:
3457 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3458 tso->block_info.closure, info_type(tso->block_info.closure));
3461 case BlockedOnCCall:
3462 debugBelch("is blocked on an external call");
3464 case BlockedOnCCall_NoUnblockExc:
3465 debugBelch("is blocked on an external call (exceptions were already blocked)");
3468 debugBelch("is blocked on an STM operation");
3471 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3472 tso->why_blocked, tso->id, tso);
3478 printThreadStatus(StgTSO *tso)
3480 switch (tso->what_next) {
3482 debugBelch("has been killed");
3484 case ThreadComplete:
3485 debugBelch("has completed");
3488 printThreadBlockage(tso);
3493 printAllThreads(void)
3498 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3499 ullong_format_string(TIME_ON_PROC(CurrentProc),
3500 time_string, rtsFalse/*no commas!*/);
3502 debugBelch("all threads at [%s]:\n", time_string);
3504 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3505 ullong_format_string(CURRENT_TIME,
3506 time_string, rtsFalse/*no commas!*/);
3508 debugBelch("all threads at [%s]:\n", time_string);
3510 debugBelch("all threads:\n");
3513 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3514 debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3517 void *label = lookupThreadLabel(t->id);
3518 if (label) debugBelch("[\"%s\"] ",(char *)label);
3521 printThreadStatus(t);
3529 Print a whole blocking queue attached to node (debugging only).
3533 print_bq (StgClosure *node)
3535 StgBlockingQueueElement *bqe;
3539 debugBelch("## BQ of closure %p (%s): ",
3540 node, info_type(node));
3542 /* should cover all closures that may have a blocking queue */
3543 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3544 get_itbl(node)->type == FETCH_ME_BQ ||
3545 get_itbl(node)->type == RBH ||
3546 get_itbl(node)->type == MVAR);
3548 ASSERT(node!=(StgClosure*)NULL); // sanity check
3550 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3554 Print a whole blocking queue starting with the element bqe.
3557 print_bqe (StgBlockingQueueElement *bqe)
3562 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3564 for (end = (bqe==END_BQ_QUEUE);
3565 !end; // iterate until bqe points to a CONSTR
3566 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3567 bqe = end ? END_BQ_QUEUE : bqe->link) {
3568 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3569 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3570 /* types of closures that may appear in a blocking queue */
3571 ASSERT(get_itbl(bqe)->type == TSO ||
3572 get_itbl(bqe)->type == BLOCKED_FETCH ||
3573 get_itbl(bqe)->type == CONSTR);
3574 /* only BQs of an RBH end with an RBH_Save closure */
3575 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3577 switch (get_itbl(bqe)->type) {
3579 debugBelch(" TSO %u (%x),",
3580 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3583 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3584 ((StgBlockedFetch *)bqe)->node,
3585 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3586 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3587 ((StgBlockedFetch *)bqe)->ga.weight);
3590 debugBelch(" %s (IP %p),",
3591 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3592 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3593 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3594 "RBH_Save_?"), get_itbl(bqe));
3597 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3598 info_type((StgClosure *)bqe)); // , node, info_type(node));
3604 # elif defined(GRAN)
3606 print_bq (StgClosure *node)
3608 StgBlockingQueueElement *bqe;
3609 PEs node_loc, tso_loc;
3612 /* should cover all closures that may have a blocking queue */
3613 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3614 get_itbl(node)->type == FETCH_ME_BQ ||
3615 get_itbl(node)->type == RBH);
3617 ASSERT(node!=(StgClosure*)NULL); // sanity check
3618 node_loc = where_is(node);
3620 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3621 node, info_type(node), node_loc);
3624 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3626 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3627 !end; // iterate until bqe points to a CONSTR
3628 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3629 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3630 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3631 /* types of closures that may appear in a blocking queue */
3632 ASSERT(get_itbl(bqe)->type == TSO ||
3633 get_itbl(bqe)->type == CONSTR);
3634 /* only BQs of an RBH end with an RBH_Save closure */
3635 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3637 tso_loc = where_is((StgClosure *)bqe);
3638 switch (get_itbl(bqe)->type) {
3640 debugBelch(" TSO %d (%p) on [PE %d],",
3641 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3644 debugBelch(" %s (IP %p),",
3645 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3646 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3647 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3648 "RBH_Save_?"), get_itbl(bqe));
3651 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3652 info_type((StgClosure *)bqe), node, info_type(node));
3660 Nice and easy: only TSOs on the blocking queue
3663 print_bq (StgClosure *node)
3667 ASSERT(node!=(StgClosure*)NULL); // sanity check
3668 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3669 tso != END_TSO_QUEUE;
3671 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3672 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3673 debugBelch(" TSO %d (%p),", tso->id, tso);
3686 for (i=0, tso=run_queue_hd;
3687 tso != END_TSO_QUEUE;
3696 sched_belch(char *s, ...)
3700 #ifdef RTS_SUPPORTS_THREADS
3701 debugBelch("sched (task %p): ", osThreadId());
3705 debugBelch("sched: ");