1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2004
7 * Different GHC ways use this scheduler quite differently (see comments below)
8 * Here is the global picture:
10 * WAY Name CPP flag What's it for
11 * --------------------------------------
12 * mp GUM PAR Parallel execution on a distrib. memory machine
13 * s SMP SMP Parallel execution on a shared memory machine
14 * mg GranSim GRAN Simulation of parallel execution
15 * md GUM/GdH DIST Distributed execution (based on GUM)
17 * --------------------------------------------------------------------------*/
20 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22 The main scheduling loop in GUM iterates until a finish message is received.
23 In that case a global flag @receivedFinish@ is set and this instance of
24 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25 for the handling of incoming messages, such as PP_FINISH.
26 Note that in the parallel case we have a system manager that coordinates
27 different PEs, each of which are running one instance of the RTS.
28 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33 The main scheduling code in GranSim is quite different from that in std
34 (concurrent) Haskell: while concurrent Haskell just iterates over the
35 threads in the runnable queue, GranSim is event driven, i.e. it iterates
36 over the events in the global event queue. -- HWL
39 #include "PosixSource.h"
44 #include "BlockAlloc.h"
48 #define COMPILING_SCHEDULER
50 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
65 #include "Proftimer.h"
68 #if defined(GRAN) || defined(PAR)
69 # include "GranSimRts.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
78 #include "Capability.h"
79 #include "OSThreads.h"
82 #ifdef HAVE_SYS_TYPES_H
83 #include <sys/types.h>
98 #define USED_IN_THREADED_RTS
100 #define USED_IN_THREADED_RTS STG_UNUSED
103 #ifdef RTS_SUPPORTS_THREADS
104 #define USED_WHEN_RTS_SUPPORTS_THREADS
106 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
109 /* Main thread queue.
110 * Locks required: sched_mutex.
112 StgMainThread *main_threads = NULL;
115 * Locks required: sched_mutex.
119 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
120 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
123 In GranSim we have a runnable and a blocked queue for each processor.
124 In order to minimise code changes new arrays run_queue_hds/tls
125 are created. run_queue_hd is then a short cut (macro) for
126 run_queue_hds[CurrentProc] (see GranSim.h).
129 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
130 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
131 StgTSO *ccalling_threadss[MAX_PROC];
132 /* We use the same global list of threads (all_threads) in GranSim as in
133 the std RTS (i.e. we are cheating). However, we don't use this list in
134 the GranSim specific code at the moment (so we are only potentially
139 StgTSO *run_queue_hd = NULL;
140 StgTSO *run_queue_tl = NULL;
141 StgTSO *blocked_queue_hd = NULL;
142 StgTSO *blocked_queue_tl = NULL;
143 StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */
147 /* Linked list of all threads.
148 * Used for detecting garbage collected threads.
150 StgTSO *all_threads = NULL;
152 /* When a thread performs a safe C call (_ccall_GC, using old
153 * terminology), it gets put on the suspended_ccalling_threads
154 * list. Used by the garbage collector.
156 static StgTSO *suspended_ccalling_threads;
158 static StgTSO *threadStackOverflow(StgTSO *tso);
160 /* KH: The following two flags are shared memory locations. There is no need
161 to lock them, since they are only unset at the end of a scheduler
165 /* flag set by signal handler to precipitate a context switch */
166 int context_switch = 0;
168 /* if this flag is set as well, give up execution */
169 rtsBool interrupted = rtsFalse;
171 /* Next thread ID to allocate.
172 * Locks required: thread_id_mutex
174 static StgThreadID next_thread_id = 1;
177 * Pointers to the state of the current thread.
178 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
179 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
182 /* The smallest stack size that makes any sense is:
183 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
184 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
185 * + 1 (the closure to enter)
187 * + 1 (spare slot req'd by stg_ap_v_ret)
189 * A thread with this stack will bomb immediately with a stack
190 * overflow, which will increase its stack size.
193 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
200 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
201 * exists - earlier gccs apparently didn't.
206 static rtsBool ready_to_gc;
209 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
210 * in an MT setting, needed to signal that a worker thread shouldn't hang around
211 * in the scheduler when it is out of work.
213 static rtsBool shutting_down_scheduler = rtsFalse;
215 void addToBlockedQueue ( StgTSO *tso );
217 static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
218 void interruptStgRts ( void );
220 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
221 static void detectBlackHoles ( void );
224 static void raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);
226 #if defined(RTS_SUPPORTS_THREADS)
227 /* ToDo: carefully document the invariants that go together
228 * with these synchronisation objects.
230 Mutex sched_mutex = INIT_MUTEX_VAR;
231 Mutex term_mutex = INIT_MUTEX_VAR;
233 #endif /* RTS_SUPPORTS_THREADS */
237 rtsTime TimeOfLastYield;
238 rtsBool emitSchedule = rtsTrue;
242 static char *whatNext_strs[] = {
253 StgTSO * createSparkThread(rtsSpark spark);
254 StgTSO * activateSpark (rtsSpark spark);
257 /* ----------------------------------------------------------------------------
259 * ------------------------------------------------------------------------- */
261 #if defined(RTS_SUPPORTS_THREADS)
262 static rtsBool startingWorkerThread = rtsFalse;
264 static void taskStart(void);
268 ACQUIRE_LOCK(&sched_mutex);
269 startingWorkerThread = rtsFalse;
271 RELEASE_LOCK(&sched_mutex);
275 startSchedulerTaskIfNecessary(void)
277 if(run_queue_hd != END_TSO_QUEUE
278 || blocked_queue_hd != END_TSO_QUEUE
279 || sleeping_queue != END_TSO_QUEUE)
281 if(!startingWorkerThread)
282 { // we don't want to start another worker thread
283 // just because the last one hasn't yet reached the
284 // "waiting for capability" state
285 startingWorkerThread = rtsTrue;
286 if(!startTask(taskStart))
288 startingWorkerThread = rtsFalse;
295 /* ---------------------------------------------------------------------------
296 Main scheduling loop.
298 We use round-robin scheduling, each thread returning to the
299 scheduler loop when one of these conditions is detected:
302 * timer expires (thread yields)
307 Locking notes: we acquire the scheduler lock once at the beginning
308 of the scheduler loop, and release it when
310 * running a thread, or
311 * waiting for work, or
312 * waiting for a GC to complete.
315 In a GranSim setup this loop iterates over the global event queue.
316 This revolves around the global event queue, which determines what
317 to do next. Therefore, it's more complicated than either the
318 concurrent or the parallel (GUM) setup.
321 GUM iterates over incoming messages.
322 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
323 and sends out a fish whenever it has nothing to do; in-between
324 doing the actual reductions (shared code below) it processes the
325 incoming messages and deals with delayed operations
326 (see PendingFetches).
327 This is not the ugliest code you could imagine, but it's bloody close.
329 ------------------------------------------------------------------------ */
331 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
332 Capability *initialCapability )
336 StgThreadReturnCode ret;
344 rtsBool receivedFinish = rtsFalse;
346 nat tp_size, sp_size; // stats only
349 rtsBool was_interrupted = rtsFalse;
352 // Pre-condition: sched_mutex is held.
353 // We might have a capability, passed in as initialCapability.
354 cap = initialCapability;
356 #if defined(RTS_SUPPORTS_THREADS)
358 // in the threaded case, the capability is either passed in via the
359 // initialCapability parameter, or initialized inside the scheduler
363 sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
364 mainThread, initialCapability);
367 // simply initialise it in the non-threaded case
368 grabCapability(&cap);
372 /* set up first event to get things going */
373 /* ToDo: assign costs for system setup and init MainTSO ! */
374 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
376 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
379 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
380 G_TSO(CurrentTSO, 5));
382 if (RtsFlags.GranFlags.Light) {
383 /* Save current time; GranSim Light only */
384 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
387 event = get_next_event();
389 while (event!=(rtsEvent*)NULL) {
390 /* Choose the processor with the next event */
391 CurrentProc = event->proc;
392 CurrentTSO = event->tso;
396 while (!receivedFinish) { /* set by processMessages */
397 /* when receiving PP_FINISH message */
399 #else // everything except GRAN and PAR
405 IF_DEBUG(scheduler, printAllThreads());
407 #if defined(RTS_SUPPORTS_THREADS)
408 // Yield the capability to higher-priority tasks if necessary.
411 yieldCapability(&cap);
414 // If we do not currently hold a capability, we wait for one
417 waitForCapability(&sched_mutex, &cap,
418 mainThread ? &mainThread->bound_thread_cond : NULL);
421 // We now have a capability...
425 // If we're interrupted (the user pressed ^C, or some other
426 // termination condition occurred), kill all the currently running
430 IF_DEBUG(scheduler, sched_belch("interrupted"));
431 interrupted = rtsFalse;
432 was_interrupted = rtsTrue;
433 #if defined(RTS_SUPPORTS_THREADS)
434 // In the threaded RTS, deadlock detection doesn't work,
435 // so just exit right away.
436 errorBelch("interrupted");
437 releaseCapability(cap);
438 RELEASE_LOCK(&sched_mutex);
439 shutdownHaskellAndExit(EXIT_SUCCESS);
445 #if defined(RTS_USER_SIGNALS)
446 // check for signals each time around the scheduler
447 if (signals_pending()) {
448 RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
449 startSignalHandlers();
450 ACQUIRE_LOCK(&sched_mutex);
455 // Check whether any waiting threads need to be woken up. If the
456 // run queue is empty, and there are no other tasks running, we
457 // can wait indefinitely for something to happen.
459 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
461 #if defined(RTS_SUPPORTS_THREADS)
462 // We shouldn't be here...
463 barf("schedule: awaitEvent() in threaded RTS");
465 awaitEvent( EMPTY_RUN_QUEUE() );
467 // we can be interrupted while waiting for I/O...
468 if (interrupted) continue;
471 * Detect deadlock: when we have no threads to run, there are no
472 * threads waiting on I/O or sleeping, and all the other tasks are
473 * waiting for work, we must have a deadlock of some description.
475 * We first try to find threads blocked on themselves (ie. black
476 * holes), and generate NonTermination exceptions where necessary.
478 * If no threads are black holed, we have a deadlock situation, so
479 * inform all the main threads.
481 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
482 if ( EMPTY_THREAD_QUEUES() )
484 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
486 // Garbage collection can release some new threads due to
487 // either (a) finalizers or (b) threads resurrected because
488 // they are unreachable and will therefore be sent an
489 // exception. Any threads thus released will be immediately
491 GarbageCollect(GetRoots,rtsTrue);
492 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
494 #if defined(RTS_USER_SIGNALS)
495 /* If we have user-installed signal handlers, then wait
496 * for signals to arrive rather then bombing out with a
499 if ( anyUserHandlers() ) {
501 sched_belch("still deadlocked, waiting for signals..."));
505 // we might be interrupted...
506 if (interrupted) { continue; }
508 if (signals_pending()) {
509 RELEASE_LOCK(&sched_mutex);
510 startSignalHandlers();
511 ACQUIRE_LOCK(&sched_mutex);
513 ASSERT(!EMPTY_RUN_QUEUE());
518 /* Probably a real deadlock. Send the current main thread the
519 * Deadlock exception (or in the SMP build, send *all* main
520 * threads the deadlock exception, since none of them can make
526 switch (m->tso->why_blocked) {
527 case BlockedOnBlackHole:
528 case BlockedOnException:
530 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
533 barf("deadlock: main thread blocked in a strange way");
539 #elif defined(RTS_SUPPORTS_THREADS)
540 // ToDo: add deadlock detection in threaded RTS
542 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
545 #if defined(RTS_SUPPORTS_THREADS)
546 if ( EMPTY_RUN_QUEUE() ) {
547 continue; // nothing to do
552 if (RtsFlags.GranFlags.Light)
553 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
555 /* adjust time based on time-stamp */
556 if (event->time > CurrentTime[CurrentProc] &&
557 event->evttype != ContinueThread)
558 CurrentTime[CurrentProc] = event->time;
560 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
561 if (!RtsFlags.GranFlags.Light)
564 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
566 /* main event dispatcher in GranSim */
567 switch (event->evttype) {
568 /* Should just be continuing execution */
570 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
571 /* ToDo: check assertion
572 ASSERT(run_queue_hd != (StgTSO*)NULL &&
573 run_queue_hd != END_TSO_QUEUE);
575 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
576 if (!RtsFlags.GranFlags.DoAsyncFetch &&
577 procStatus[CurrentProc]==Fetching) {
578 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
579 CurrentTSO->id, CurrentTSO, CurrentProc);
582 /* Ignore ContinueThreads for completed threads */
583 if (CurrentTSO->what_next == ThreadComplete) {
584 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
585 CurrentTSO->id, CurrentTSO, CurrentProc);
588 /* Ignore ContinueThreads for threads that are being migrated */
589 if (PROCS(CurrentTSO)==Nowhere) {
590 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
591 CurrentTSO->id, CurrentTSO, CurrentProc);
594 /* The thread should be at the beginning of the run queue */
595 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
596 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
597 CurrentTSO->id, CurrentTSO, CurrentProc);
598 break; // run the thread anyway
601 new_event(proc, proc, CurrentTime[proc],
603 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
605 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
606 break; // now actually run the thread; DaH Qu'vam yImuHbej
609 do_the_fetchnode(event);
610 goto next_thread; /* handle next event in event queue */
613 do_the_globalblock(event);
614 goto next_thread; /* handle next event in event queue */
617 do_the_fetchreply(event);
618 goto next_thread; /* handle next event in event queue */
620 case UnblockThread: /* Move from the blocked queue to the tail of */
621 do_the_unblock(event);
622 goto next_thread; /* handle next event in event queue */
624 case ResumeThread: /* Move from the blocked queue to the tail of */
625 /* the runnable queue ( i.e. Qu' SImqa'lu') */
626 event->tso->gran.blocktime +=
627 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
628 do_the_startthread(event);
629 goto next_thread; /* handle next event in event queue */
632 do_the_startthread(event);
633 goto next_thread; /* handle next event in event queue */
636 do_the_movethread(event);
637 goto next_thread; /* handle next event in event queue */
640 do_the_movespark(event);
641 goto next_thread; /* handle next event in event queue */
644 do_the_findwork(event);
645 goto next_thread; /* handle next event in event queue */
648 barf("Illegal event type %u\n", event->evttype);
651 /* This point was scheduler_loop in the old RTS */
653 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
655 TimeOfLastEvent = CurrentTime[CurrentProc];
656 TimeOfNextEvent = get_time_of_next_event();
657 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
658 // CurrentTSO = ThreadQueueHd;
660 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
663 if (RtsFlags.GranFlags.Light)
664 GranSimLight_leave_system(event, &ActiveTSO);
666 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
669 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
671 /* in a GranSim setup the TSO stays on the run queue */
673 /* Take a thread from the run queue. */
674 POP_RUN_QUEUE(t); // take_off_run_queue(t);
677 debugBelch("GRAN: About to run current thread, which is\n");
680 context_switch = 0; // turned on via GranYield, checking events and time slice
683 DumpGranEvent(GR_SCHEDULE, t));
685 procStatus[CurrentProc] = Busy;
688 if (PendingFetches != END_BF_QUEUE) {
692 /* ToDo: phps merge with spark activation above */
693 /* check whether we have local work and send requests if we have none */
694 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
695 /* :-[ no local threads => look out for local sparks */
696 /* the spark pool for the current PE */
697 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
698 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
699 pool->hd < pool->tl) {
701 * ToDo: add GC code check that we really have enough heap afterwards!!
703 * If we're here (no runnable threads) and we have pending
704 * sparks, we must have a space problem. Get enough space
705 * to turn one of those pending sparks into a
709 spark = findSpark(rtsFalse); /* get a spark */
710 if (spark != (rtsSpark) NULL) {
711 tso = activateSpark(spark); /* turn the spark into a thread */
712 IF_PAR_DEBUG(schedule,
713 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
714 tso->id, tso, advisory_thread_count));
716 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
717 debugBelch("==^^ failed to activate spark\n");
719 } /* otherwise fall through & pick-up new tso */
721 IF_PAR_DEBUG(verbose,
722 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
723 spark_queue_len(pool)));
728 /* If we still have no work we need to send a FISH to get a spark
731 if (EMPTY_RUN_QUEUE()) {
732 /* =8-[ no local sparks => look for work on other PEs */
734 * We really have absolutely no work. Send out a fish
735 * (there may be some out there already), and wait for
736 * something to arrive. We clearly can't run any threads
737 * until a SCHEDULE or RESUME arrives, and so that's what
738 * we're hoping to see. (Of course, we still have to
739 * respond to other types of messages.)
741 TIME now = msTime() /*CURRENT_TIME*/;
742 IF_PAR_DEBUG(verbose,
743 debugBelch("-- now=%ld\n", now));
744 IF_PAR_DEBUG(verbose,
745 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
746 (last_fish_arrived_at!=0 &&
747 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
748 debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
749 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
750 last_fish_arrived_at,
751 RtsFlags.ParFlags.fishDelay, now);
754 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
755 (last_fish_arrived_at==0 ||
756 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
757 /* outstandingFishes is set in sendFish, processFish;
758 avoid flooding system with fishes via delay */
760 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
763 // Global statistics: count no. of fishes
764 if (RtsFlags.ParFlags.ParStats.Global &&
765 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
766 globalParStats.tot_fish_mess++;
770 receivedFinish = processMessages();
773 } else if (PacketsWaiting()) { /* Look for incoming messages */
774 receivedFinish = processMessages();
777 /* Now we are sure that we have some work available */
778 ASSERT(run_queue_hd != END_TSO_QUEUE);
780 /* Take a thread from the run queue, if we have work */
781 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
782 IF_DEBUG(sanity,checkTSO(t));
784 /* ToDo: write something to the log-file
785 if (RTSflags.ParFlags.granSimStats && !sameThread)
786 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
790 /* the spark pool for the current PE */
791 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
794 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
795 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
798 if (0 && RtsFlags.ParFlags.ParStats.Full &&
799 t && LastTSO && t->id != LastTSO->id &&
800 LastTSO->why_blocked == NotBlocked &&
801 LastTSO->what_next != ThreadComplete) {
802 // if previously scheduled TSO not blocked we have to record the context switch
803 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
804 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
807 if (RtsFlags.ParFlags.ParStats.Full &&
808 (emitSchedule /* forced emit */ ||
809 (t && LastTSO && t->id != LastTSO->id))) {
811 we are running a different TSO, so write a schedule event to log file
812 NB: If we use fair scheduling we also have to write a deschedule
813 event for LastTSO; with unfair scheduling we know that the
814 previous tso has blocked whenever we switch to another tso, so
815 we don't need it in GUM for now
817 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
818 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
819 emitSchedule = rtsFalse;
823 #else /* !GRAN && !PAR */
825 // grab a thread from the run queue
826 ASSERT(run_queue_hd != END_TSO_QUEUE);
829 // Sanity check the thread we're about to run. This can be
830 // expensive if there is lots of thread switching going on...
831 IF_DEBUG(sanity,checkTSO(t));
836 StgMainThread *m = t->main;
843 sched_belch("### Running thread %d in bound thread", t->id));
844 // yes, the Haskell thread is bound to the current native thread
849 sched_belch("### thread %d bound to another OS thread", t->id));
850 // no, bound to a different Haskell thread: pass to that thread
851 PUSH_ON_RUN_QUEUE(t);
852 passCapability(&m->bound_thread_cond);
858 if(mainThread != NULL)
859 // The thread we want to run is bound.
862 sched_belch("### this OS thread cannot run thread %d", t->id));
863 // no, the current native thread is bound to a different
864 // Haskell thread, so pass it to any worker thread
865 PUSH_ON_RUN_QUEUE(t);
866 passCapabilityToWorker();
873 cap->r.rCurrentTSO = t;
875 /* context switches are now initiated by the timer signal, unless
876 * the user specified "context switch as often as possible", with
879 if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
880 && (run_queue_hd != END_TSO_QUEUE
881 || blocked_queue_hd != END_TSO_QUEUE
882 || sleeping_queue != END_TSO_QUEUE)))
887 RELEASE_LOCK(&sched_mutex);
889 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
890 (long)t->id, whatNext_strs[t->what_next]));
893 startHeapProfTimer();
896 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
897 /* Run the current thread
899 prev_what_next = t->what_next;
901 errno = t->saved_errno;
903 switch (prev_what_next) {
907 /* Thread already finished, return to scheduler. */
908 ret = ThreadFinished;
912 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
915 case ThreadInterpret:
916 ret = interpretBCO(cap);
920 barf("schedule: invalid what_next field");
923 // The TSO might have moved, so find the new location:
924 t = cap->r.rCurrentTSO;
926 // And save the current errno in this thread.
927 t->saved_errno = errno;
929 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
931 /* Costs for the scheduler are assigned to CCS_SYSTEM */
937 ACQUIRE_LOCK(&sched_mutex);
939 #ifdef RTS_SUPPORTS_THREADS
940 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
941 #elif !defined(GRAN) && !defined(PAR)
942 IF_DEBUG(scheduler,debugBelch("sched: "););
946 /* HACK 675: if the last thread didn't yield, make sure to print a
947 SCHEDULE event to the log file when StgRunning the next thread, even
948 if it is the same one as before */
950 TimeOfLastYield = CURRENT_TIME;
956 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
957 globalGranStats.tot_heapover++;
959 globalParStats.tot_heapover++;
962 // did the task ask for a large block?
963 if (cap->r.rHpAlloc > BLOCK_SIZE) {
964 // if so, get one and push it on the front of the nursery.
968 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
970 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n",
971 (long)t->id, whatNext_strs[t->what_next], blocks));
973 // don't do this if it would push us over the
974 // alloc_blocks_lim limit; we'll GC first.
975 if (alloc_blocks + blocks < alloc_blocks_lim) {
977 alloc_blocks += blocks;
978 bd = allocGroup( blocks );
980 // link the new group into the list
981 bd->link = cap->r.rCurrentNursery;
982 bd->u.back = cap->r.rCurrentNursery->u.back;
983 if (cap->r.rCurrentNursery->u.back != NULL) {
984 cap->r.rCurrentNursery->u.back->link = bd;
986 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
987 g0s0->blocks == cap->r.rNursery);
988 cap->r.rNursery = g0s0->blocks = bd;
990 cap->r.rCurrentNursery->u.back = bd;
992 // initialise it as a nursery block. We initialise the
993 // step, gen_no, and flags field of *every* sub-block in
994 // this large block, because this is easier than making
995 // sure that we always find the block head of a large
996 // block whenever we call Bdescr() (eg. evacuate() and
997 // isAlive() in the GC would both have to do this, at
1001 for (x = bd; x < bd + blocks; x++) {
1008 // don't forget to update the block count in g0s0.
1009 g0s0->n_blocks += blocks;
1010 // This assert can be a killer if the app is doing lots
1011 // of large block allocations.
1012 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1014 // now update the nursery to point to the new block
1015 cap->r.rCurrentNursery = bd;
1017 // we might be unlucky and have another thread get on the
1018 // run queue before us and steal the large block, but in that
1019 // case the thread will just end up requesting another large
1021 PUSH_ON_RUN_QUEUE(t);
1026 /* make all the running tasks block on a condition variable,
1027 * maybe set context_switch and wait till they all pile in,
1028 * then have them wait on a GC condition variable.
1030 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1031 (long)t->id, whatNext_strs[t->what_next]));
1034 ASSERT(!is_on_queue(t,CurrentProc));
1036 /* Currently we emit a DESCHEDULE event before GC in GUM.
1037 ToDo: either add separate event to distinguish SYSTEM time from rest
1038 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1039 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1040 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1041 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1042 emitSchedule = rtsTrue;
1046 ready_to_gc = rtsTrue;
1047 context_switch = 1; /* stop other threads ASAP */
1048 PUSH_ON_RUN_QUEUE(t);
1049 /* actual GC is done at the end of the while loop */
1055 DumpGranEvent(GR_DESCHEDULE, t));
1056 globalGranStats.tot_stackover++;
1059 // DumpGranEvent(GR_DESCHEDULE, t);
1060 globalParStats.tot_stackover++;
1062 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1063 (long)t->id, whatNext_strs[t->what_next]));
1064 /* just adjust the stack for this thread, then pop it back
1069 /* enlarge the stack */
1070 StgTSO *new_t = threadStackOverflow(t);
1072 /* This TSO has moved, so update any pointers to it from the
1073 * main thread stack. It better not be on any other queues...
1074 * (it shouldn't be).
1076 if (t->main != NULL) {
1077 t->main->tso = new_t;
1079 PUSH_ON_RUN_QUEUE(new_t);
1083 case ThreadYielding:
1084 // Reset the context switch flag. We don't do this just before
1085 // running the thread, because that would mean we would lose ticks
1086 // during GC, which can lead to unfair scheduling (a thread hogs
1087 // the CPU because the tick always arrives during GC). This way
1088 // penalises threads that do a lot of allocation, but that seems
1089 // better than the alternative.
1094 DumpGranEvent(GR_DESCHEDULE, t));
1095 globalGranStats.tot_yields++;
1098 // DumpGranEvent(GR_DESCHEDULE, t);
1099 globalParStats.tot_yields++;
1101 /* put the thread back on the run queue. Then, if we're ready to
1102 * GC, check whether this is the last task to stop. If so, wake
1103 * up the GC thread. getThread will block during a GC until the
1107 if (t->what_next != prev_what_next) {
1108 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1109 (long)t->id, whatNext_strs[t->what_next]);
1111 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1112 (long)t->id, whatNext_strs[t->what_next]);
1117 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1119 ASSERT(t->link == END_TSO_QUEUE);
1121 // Shortcut if we're just switching evaluators: don't bother
1122 // doing stack squeezing (which can be expensive), just run the
1124 if (t->what_next != prev_what_next) {
1131 ASSERT(!is_on_queue(t,CurrentProc));
1134 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1135 checkThreadQsSanity(rtsTrue));
1139 if (RtsFlags.ParFlags.doFairScheduling) {
1140 /* this does round-robin scheduling; good for concurrency */
1141 APPEND_TO_RUN_QUEUE(t);
1143 /* this does unfair scheduling; good for parallelism */
1144 PUSH_ON_RUN_QUEUE(t);
1147 // this does round-robin scheduling; good for concurrency
1148 APPEND_TO_RUN_QUEUE(t);
1152 /* add a ContinueThread event to actually process the thread */
1153 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1155 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1157 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1166 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1167 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1168 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1170 // ??? needed; should emit block before
1172 DumpGranEvent(GR_DESCHEDULE, t));
1173 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1176 ASSERT(procStatus[CurrentProc]==Busy ||
1177 ((procStatus[CurrentProc]==Fetching) &&
1178 (t->block_info.closure!=(StgClosure*)NULL)));
1179 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1180 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1181 procStatus[CurrentProc]==Fetching))
1182 procStatus[CurrentProc] = Idle;
1186 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1187 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1190 if (t->block_info.closure!=(StgClosure*)NULL)
1191 print_bq(t->block_info.closure));
1193 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1196 /* whatever we schedule next, we must log that schedule */
1197 emitSchedule = rtsTrue;
1200 /* don't need to do anything. Either the thread is blocked on
1201 * I/O, in which case we'll have called addToBlockedQueue
1202 * previously, or it's blocked on an MVar or Blackhole, in which
1203 * case it'll be on the relevant queue already.
1205 ASSERT(t->why_blocked != NotBlocked);
1207 debugBelch("--<< thread %d (%s) stopped: ",
1208 t->id, whatNext_strs[t->what_next]);
1209 printThreadBlockage(t);
1212 /* Only for dumping event to log file
1213 ToDo: do I need this in GranSim, too?
1220 case ThreadFinished:
1221 /* Need to check whether this was a main thread, and if so, signal
1222 * the task that started it with the return value. If we have no
1223 * more main threads, we probably need to stop all the tasks until
1226 /* We also end up here if the thread kills itself with an
1227 * uncaught exception, see Exception.hc.
1229 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1230 t->id, whatNext_strs[t->what_next]));
1232 endThread(t, CurrentProc); // clean-up the thread
1234 /* For now all are advisory -- HWL */
1235 //if(t->priority==AdvisoryPriority) ??
1236 advisory_thread_count--;
1239 if(t->dist.priority==RevalPriority)
1243 if (RtsFlags.ParFlags.ParStats.Full &&
1244 !RtsFlags.ParFlags.ParStats.Suppressed)
1245 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1249 // Check whether the thread that just completed was a main
1250 // thread, and if so return with the result.
1252 // There is an assumption here that all thread completion goes
1253 // through this point; we need to make sure that if a thread
1254 // ends up in the ThreadKilled state, that it stays on the run
1255 // queue so it can be dealt with here.
1258 #if defined(RTS_SUPPORTS_THREADS)
1261 mainThread->tso == t
1265 // We are a bound thread: this must be our thread that just
1267 ASSERT(mainThread->tso == t);
1269 if (t->what_next == ThreadComplete) {
1270 if (mainThread->ret) {
1271 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1272 *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1];
1274 mainThread->stat = Success;
1276 if (mainThread->ret) {
1277 *(mainThread->ret) = NULL;
1279 if (was_interrupted) {
1280 mainThread->stat = Interrupted;
1282 mainThread->stat = Killed;
1286 removeThreadLabel((StgWord)mainThread->tso->id);
1288 if (mainThread->prev == NULL) {
1289 main_threads = mainThread->link;
1291 mainThread->prev->link = mainThread->link;
1293 if (mainThread->link != NULL) {
1294 mainThread->link->prev = NULL;
1296 releaseCapability(cap);
1300 #ifdef RTS_SUPPORTS_THREADS
1301 ASSERT(t->main == NULL);
1303 if (t->main != NULL) {
1304 // Must be a main thread that is not the topmost one. Leave
1305 // it on the run queue until the stack has unwound to the
1306 // point where we can deal with this. Leaving it on the run
1307 // queue also ensures that the garbage collector knows about
1308 // this thread and its return value (it gets dropped from the
1309 // all_threads list so there's no other way to find it).
1310 APPEND_TO_RUN_QUEUE(t);
1316 barf("schedule: invalid thread return code %d", (int)ret);
1320 // When we have +RTS -i0 and we're heap profiling, do a census at
1321 // every GC. This lets us get repeatable runs for debugging.
1322 if (performHeapProfile ||
1323 (RtsFlags.ProfFlags.profileInterval==0 &&
1324 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1325 GarbageCollect(GetRoots, rtsTrue);
1327 performHeapProfile = rtsFalse;
1328 ready_to_gc = rtsFalse; // we already GC'd
1333 /* Kick any transactions which are invalid back to their atomically frames.
1334 * When next scheduled they will try to commit, this commit will fail and
1335 * they will retry. */
1336 for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1337 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1338 if (!stmValidateTransaction (t -> trec)) {
1339 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1341 // strip the stack back to the ATOMICALLY_FRAME, aborting
1342 // the (nested) transaction, and saving the stack of any
1343 // partially-evaluated thunks on the heap.
1344 raiseAsync_(t, NULL, rtsTrue);
1346 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1351 /* everybody back, start the GC.
1352 * Could do it in this thread, or signal a condition var
1353 * to do it in another thread. Either way, we need to
1354 * broadcast on gc_pending_cond afterward.
1356 #if defined(RTS_SUPPORTS_THREADS)
1357 IF_DEBUG(scheduler,sched_belch("doing GC"));
1359 GarbageCollect(GetRoots,rtsFalse);
1360 ready_to_gc = rtsFalse;
1362 /* add a ContinueThread event to continue execution of current thread */
1363 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1365 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1367 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1375 IF_GRAN_DEBUG(unused,
1376 print_eventq(EventHd));
1378 event = get_next_event();
1381 /* ToDo: wait for next message to arrive rather than busy wait */
1384 } /* end of while(1) */
1386 IF_PAR_DEBUG(verbose,
1387 debugBelch("== Leaving schedule() after having received Finish\n"));
1390 /* ---------------------------------------------------------------------------
1391 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1392 * used by Control.Concurrent for error checking.
1393 * ------------------------------------------------------------------------- */
1396 rtsSupportsBoundThreads(void)
1405 /* ---------------------------------------------------------------------------
1406 * isThreadBound(tso): check whether tso is bound to an OS thread.
1407 * ------------------------------------------------------------------------- */
1410 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1413 return (tso->main != NULL);
1418 /* ---------------------------------------------------------------------------
1419 * Singleton fork(). Do not copy any running threads.
1420 * ------------------------------------------------------------------------- */
1422 #ifndef mingw32_TARGET_OS
1423 #define FORKPROCESS_PRIMOP_SUPPORTED
1426 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1428 deleteThreadImmediately(StgTSO *tso);
1431 forkProcess(HsStablePtr *entry
1432 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1437 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1443 IF_DEBUG(scheduler,sched_belch("forking!"));
1444 rts_lock(); // This not only acquires sched_mutex, it also
1445 // makes sure that no other threads are running
1449 if (pid) { /* parent */
1451 /* just return the pid */
1455 } else { /* child */
1458 // delete all threads
1459 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1461 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1464 // don't allow threads to catch the ThreadKilled exception
1465 deleteThreadImmediately(t);
1468 // wipe the main thread list
1469 while((m = main_threads) != NULL) {
1470 main_threads = m->link;
1471 # ifdef THREADED_RTS
1472 closeCondition(&m->bound_thread_cond);
1477 rc = rts_evalStableIO(entry, NULL); // run the action
1478 rts_checkSchedStatus("forkProcess",rc);
1482 hs_exit(); // clean up and exit
1485 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1486 barf("forkProcess#: primop not supported, sorry!\n");
1491 /* ---------------------------------------------------------------------------
1492 * deleteAllThreads(): kill all the live threads.
1494 * This is used when we catch a user interrupt (^C), before performing
1495 * any necessary cleanups and running finalizers.
1497 * Locks: sched_mutex held.
1498 * ------------------------------------------------------------------------- */
1501 deleteAllThreads ( void )
1504 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1505 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1506 next = t->global_link;
1510 // The run queue now contains a bunch of ThreadKilled threads. We
1511 // must not throw these away: the main thread(s) will be in there
1512 // somewhere, and the main scheduler loop has to deal with it.
1513 // Also, the run queue is the only thing keeping these threads from
1514 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1516 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1517 ASSERT(sleeping_queue == END_TSO_QUEUE);
1520 /* startThread and insertThread are now in GranSim.c -- HWL */
1523 /* ---------------------------------------------------------------------------
1524 * Suspending & resuming Haskell threads.
1526 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1527 * its capability before calling the C function. This allows another
1528 * task to pick up the capability and carry on running Haskell
1529 * threads. It also means that if the C call blocks, it won't lock
1532 * The Haskell thread making the C call is put to sleep for the
1533 * duration of the call, on the susepended_ccalling_threads queue. We
1534 * give out a token to the task, which it can use to resume the thread
1535 * on return from the C function.
1536 * ------------------------------------------------------------------------- */
1539 suspendThread( StgRegTable *reg )
1543 int saved_errno = errno;
1545 /* assume that *reg is a pointer to the StgRegTable part
1548 cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1550 ACQUIRE_LOCK(&sched_mutex);
1553 sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1555 // XXX this might not be necessary --SDM
1556 cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1558 threadPaused(cap->r.rCurrentTSO);
1559 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1560 suspended_ccalling_threads = cap->r.rCurrentTSO;
1562 if(cap->r.rCurrentTSO->blocked_exceptions == NULL) {
1563 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1564 cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1566 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1569 /* Use the thread ID as the token; it should be unique */
1570 tok = cap->r.rCurrentTSO->id;
1572 /* Hand back capability */
1573 releaseCapability(cap);
1575 #if defined(RTS_SUPPORTS_THREADS)
1576 /* Preparing to leave the RTS, so ensure there's a native thread/task
1577 waiting to take over.
1579 IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1582 RELEASE_LOCK(&sched_mutex);
1584 errno = saved_errno;
1589 resumeThread( StgInt tok )
1591 StgTSO *tso, **prev;
1593 int saved_errno = errno;
1595 #if defined(RTS_SUPPORTS_THREADS)
1596 /* Wait for permission to re-enter the RTS with the result. */
1597 ACQUIRE_LOCK(&sched_mutex);
1598 waitForReturnCapability(&sched_mutex, &cap);
1600 IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1602 grabCapability(&cap);
1605 /* Remove the thread off of the suspended list */
1606 prev = &suspended_ccalling_threads;
1607 for (tso = suspended_ccalling_threads;
1608 tso != END_TSO_QUEUE;
1609 prev = &tso->link, tso = tso->link) {
1610 if (tso->id == (StgThreadID)tok) {
1615 if (tso == END_TSO_QUEUE) {
1616 barf("resumeThread: thread not found");
1618 tso->link = END_TSO_QUEUE;
1620 if(tso->why_blocked == BlockedOnCCall) {
1621 awakenBlockedQueueNoLock(tso->blocked_exceptions);
1622 tso->blocked_exceptions = NULL;
1625 /* Reset blocking status */
1626 tso->why_blocked = NotBlocked;
1628 cap->r.rCurrentTSO = tso;
1629 RELEASE_LOCK(&sched_mutex);
1630 errno = saved_errno;
1635 /* ---------------------------------------------------------------------------
1637 * ------------------------------------------------------------------------ */
1638 static void unblockThread(StgTSO *tso);
1640 /* ---------------------------------------------------------------------------
1641 * Comparing Thread ids.
1643 * This is used from STG land in the implementation of the
1644 * instances of Eq/Ord for ThreadIds.
1645 * ------------------------------------------------------------------------ */
1648 cmp_thread(StgPtr tso1, StgPtr tso2)
1650 StgThreadID id1 = ((StgTSO *)tso1)->id;
1651 StgThreadID id2 = ((StgTSO *)tso2)->id;
1653 if (id1 < id2) return (-1);
1654 if (id1 > id2) return 1;
1658 /* ---------------------------------------------------------------------------
1659 * Fetching the ThreadID from an StgTSO.
1661 * This is used in the implementation of Show for ThreadIds.
1662 * ------------------------------------------------------------------------ */
1664 rts_getThreadId(StgPtr tso)
1666 return ((StgTSO *)tso)->id;
1671 labelThread(StgPtr tso, char *label)
1676 /* Caveat: Once set, you can only set the thread name to "" */
1677 len = strlen(label)+1;
1678 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1679 strncpy(buf,label,len);
1680 /* Update will free the old memory for us */
1681 updateThreadLabel(((StgTSO *)tso)->id,buf);
1685 /* ---------------------------------------------------------------------------
1686 Create a new thread.
1688 The new thread starts with the given stack size. Before the
1689 scheduler can run, however, this thread needs to have a closure
1690 (and possibly some arguments) pushed on its stack. See
1691 pushClosure() in Schedule.h.
1693 createGenThread() and createIOThread() (in SchedAPI.h) are
1694 convenient packaged versions of this function.
1696 currently pri (priority) is only used in a GRAN setup -- HWL
1697 ------------------------------------------------------------------------ */
1699 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1701 createThread(nat size, StgInt pri)
1704 createThread(nat size)
1711 /* First check whether we should create a thread at all */
1713 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1714 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1716 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1717 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1718 return END_TSO_QUEUE;
1724 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1727 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1729 /* catch ridiculously small stack sizes */
1730 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1731 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1734 stack_size = size - TSO_STRUCT_SIZEW;
1736 tso = (StgTSO *)allocate(size);
1737 TICK_ALLOC_TSO(stack_size, 0);
1739 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1741 SET_GRAN_HDR(tso, ThisPE);
1744 // Always start with the compiled code evaluator
1745 tso->what_next = ThreadRunGHC;
1747 tso->id = next_thread_id++;
1748 tso->why_blocked = NotBlocked;
1749 tso->blocked_exceptions = NULL;
1751 tso->saved_errno = 0;
1754 tso->stack_size = stack_size;
1755 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1757 tso->sp = (P_)&(tso->stack) + stack_size;
1759 tso->trec = NO_TREC;
1762 tso->prof.CCCS = CCS_MAIN;
1765 /* put a stop frame on the stack */
1766 tso->sp -= sizeofW(StgStopFrame);
1767 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1768 tso->link = END_TSO_QUEUE;
1772 /* uses more flexible routine in GranSim */
1773 insertThread(tso, CurrentProc);
1775 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1781 if (RtsFlags.GranFlags.GranSimStats.Full)
1782 DumpGranEvent(GR_START,tso);
1784 if (RtsFlags.ParFlags.ParStats.Full)
1785 DumpGranEvent(GR_STARTQ,tso);
1786 /* HACk to avoid SCHEDULE
1790 /* Link the new thread on the global thread list.
1792 tso->global_link = all_threads;
1796 tso->dist.priority = MandatoryPriority; //by default that is...
1800 tso->gran.pri = pri;
1802 tso->gran.magic = TSO_MAGIC; // debugging only
1804 tso->gran.sparkname = 0;
1805 tso->gran.startedat = CURRENT_TIME;
1806 tso->gran.exported = 0;
1807 tso->gran.basicblocks = 0;
1808 tso->gran.allocs = 0;
1809 tso->gran.exectime = 0;
1810 tso->gran.fetchtime = 0;
1811 tso->gran.fetchcount = 0;
1812 tso->gran.blocktime = 0;
1813 tso->gran.blockcount = 0;
1814 tso->gran.blockedat = 0;
1815 tso->gran.globalsparks = 0;
1816 tso->gran.localsparks = 0;
1817 if (RtsFlags.GranFlags.Light)
1818 tso->gran.clock = Now; /* local clock */
1820 tso->gran.clock = 0;
1822 IF_DEBUG(gran,printTSO(tso));
1825 tso->par.magic = TSO_MAGIC; // debugging only
1827 tso->par.sparkname = 0;
1828 tso->par.startedat = CURRENT_TIME;
1829 tso->par.exported = 0;
1830 tso->par.basicblocks = 0;
1831 tso->par.allocs = 0;
1832 tso->par.exectime = 0;
1833 tso->par.fetchtime = 0;
1834 tso->par.fetchcount = 0;
1835 tso->par.blocktime = 0;
1836 tso->par.blockcount = 0;
1837 tso->par.blockedat = 0;
1838 tso->par.globalsparks = 0;
1839 tso->par.localsparks = 0;
1843 globalGranStats.tot_threads_created++;
1844 globalGranStats.threads_created_on_PE[CurrentProc]++;
1845 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1846 globalGranStats.tot_sq_probes++;
1848 // collect parallel global statistics (currently done together with GC stats)
1849 if (RtsFlags.ParFlags.ParStats.Global &&
1850 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1851 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
1852 globalParStats.tot_threads_created++;
1858 sched_belch("==__ schedule: Created TSO %d (%p);",
1859 CurrentProc, tso, tso->id));
1861 IF_PAR_DEBUG(verbose,
1862 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1863 (long)tso->id, tso, advisory_thread_count));
1865 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1866 (long)tso->id, (long)tso->stack_size));
1873 all parallel thread creation calls should fall through the following routine.
1876 createSparkThread(rtsSpark spark)
1878 ASSERT(spark != (rtsSpark)NULL);
1879 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1881 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1882 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1883 return END_TSO_QUEUE;
1887 tso = createThread(RtsFlags.GcFlags.initialStkSize);
1888 if (tso==END_TSO_QUEUE)
1889 barf("createSparkThread: Cannot create TSO");
1891 tso->priority = AdvisoryPriority;
1893 pushClosure(tso,spark);
1894 PUSH_ON_RUN_QUEUE(tso);
1895 advisory_thread_count++;
1902 Turn a spark into a thread.
1903 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1907 activateSpark (rtsSpark spark)
1911 tso = createSparkThread(spark);
1912 if (RtsFlags.ParFlags.ParStats.Full) {
1913 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1914 IF_PAR_DEBUG(verbose,
1915 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1916 (StgClosure *)spark, info_type((StgClosure *)spark)));
1918 // ToDo: fwd info on local/global spark to thread -- HWL
1919 // tso->gran.exported = spark->exported;
1920 // tso->gran.locked = !spark->global;
1921 // tso->gran.sparkname = spark->name;
1927 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1928 Capability *initialCapability
1932 /* ---------------------------------------------------------------------------
1935 * scheduleThread puts a thread on the head of the runnable queue.
1936 * This will usually be done immediately after a thread is created.
1937 * The caller of scheduleThread must create the thread using e.g.
1938 * createThread and push an appropriate closure
1939 * on this thread's stack before the scheduler is invoked.
1940 * ------------------------------------------------------------------------ */
1942 static void scheduleThread_ (StgTSO* tso);
1945 scheduleThread_(StgTSO *tso)
1947 // The thread goes at the *end* of the run-queue, to avoid possible
1948 // starvation of any threads already on the queue.
1949 APPEND_TO_RUN_QUEUE(tso);
1954 scheduleThread(StgTSO* tso)
1956 ACQUIRE_LOCK(&sched_mutex);
1957 scheduleThread_(tso);
1958 RELEASE_LOCK(&sched_mutex);
1961 #if defined(RTS_SUPPORTS_THREADS)
1962 static Condition bound_cond_cache;
1963 static int bound_cond_cache_full = 0;
1968 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1969 Capability *initialCapability)
1971 // Precondition: sched_mutex must be held
1974 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1979 m->link = main_threads;
1981 if (main_threads != NULL) {
1982 main_threads->prev = m;
1986 #if defined(RTS_SUPPORTS_THREADS)
1987 // Allocating a new condition for each thread is expensive, so we
1988 // cache one. This is a pretty feeble hack, but it helps speed up
1989 // consecutive call-ins quite a bit.
1990 if (bound_cond_cache_full) {
1991 m->bound_thread_cond = bound_cond_cache;
1992 bound_cond_cache_full = 0;
1994 initCondition(&m->bound_thread_cond);
1998 /* Put the thread on the main-threads list prior to scheduling the TSO.
1999 Failure to do so introduces a race condition in the MT case (as
2000 identified by Wolfgang Thaller), whereby the new task/OS thread
2001 created by scheduleThread_() would complete prior to the thread
2002 that spawned it managed to put 'itself' on the main-threads list.
2003 The upshot of it all being that the worker thread wouldn't get to
2004 signal the completion of the its work item for the main thread to
2005 see (==> it got stuck waiting.) -- sof 6/02.
2007 IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2009 APPEND_TO_RUN_QUEUE(tso);
2010 // NB. Don't call threadRunnable() here, because the thread is
2011 // bound and only runnable by *this* OS thread, so waking up other
2012 // workers will just slow things down.
2014 return waitThread_(m, initialCapability);
2017 /* ---------------------------------------------------------------------------
2020 * Initialise the scheduler. This resets all the queues - if the
2021 * queues contained any threads, they'll be garbage collected at the
2024 * ------------------------------------------------------------------------ */
2032 for (i=0; i<=MAX_PROC; i++) {
2033 run_queue_hds[i] = END_TSO_QUEUE;
2034 run_queue_tls[i] = END_TSO_QUEUE;
2035 blocked_queue_hds[i] = END_TSO_QUEUE;
2036 blocked_queue_tls[i] = END_TSO_QUEUE;
2037 ccalling_threadss[i] = END_TSO_QUEUE;
2038 sleeping_queue = END_TSO_QUEUE;
2041 run_queue_hd = END_TSO_QUEUE;
2042 run_queue_tl = END_TSO_QUEUE;
2043 blocked_queue_hd = END_TSO_QUEUE;
2044 blocked_queue_tl = END_TSO_QUEUE;
2045 sleeping_queue = END_TSO_QUEUE;
2048 suspended_ccalling_threads = END_TSO_QUEUE;
2050 main_threads = NULL;
2051 all_threads = END_TSO_QUEUE;
2056 RtsFlags.ConcFlags.ctxtSwitchTicks =
2057 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2059 #if defined(RTS_SUPPORTS_THREADS)
2060 /* Initialise the mutex and condition variables used by
2062 initMutex(&sched_mutex);
2063 initMutex(&term_mutex);
2066 ACQUIRE_LOCK(&sched_mutex);
2068 /* A capability holds the state a native thread needs in
2069 * order to execute STG code. At least one capability is
2070 * floating around (only SMP builds have more than one).
2074 #if defined(RTS_SUPPORTS_THREADS)
2075 /* start our haskell execution tasks */
2076 startTaskManager(0,taskStart);
2079 #if /* defined(SMP) ||*/ defined(PAR)
2083 RELEASE_LOCK(&sched_mutex);
2087 exitScheduler( void )
2089 #if defined(RTS_SUPPORTS_THREADS)
2092 shutting_down_scheduler = rtsTrue;
2095 /* ----------------------------------------------------------------------------
2096 Managing the per-task allocation areas.
2098 Each capability comes with an allocation area. These are
2099 fixed-length block lists into which allocation can be done.
2101 ToDo: no support for two-space collection at the moment???
2102 ------------------------------------------------------------------------- */
2106 waitThread_(StgMainThread* m, Capability *initialCapability)
2108 SchedulerStatus stat;
2110 // Precondition: sched_mutex must be held.
2111 IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2114 /* GranSim specific init */
2115 CurrentTSO = m->tso; // the TSO to run
2116 procStatus[MainProc] = Busy; // status of main PE
2117 CurrentProc = MainProc; // PE to run it on
2118 schedule(m,initialCapability);
2120 schedule(m,initialCapability);
2121 ASSERT(m->stat != NoStatus);
2126 #if defined(RTS_SUPPORTS_THREADS)
2127 // Free the condition variable, returning it to the cache if possible.
2128 if (!bound_cond_cache_full) {
2129 bound_cond_cache = m->bound_thread_cond;
2130 bound_cond_cache_full = 1;
2132 closeCondition(&m->bound_thread_cond);
2136 IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2139 // Postcondition: sched_mutex still held
2143 /* ---------------------------------------------------------------------------
2144 Where are the roots that we know about?
2146 - all the threads on the runnable queue
2147 - all the threads on the blocked queue
2148 - all the threads on the sleeping queue
2149 - all the thread currently executing a _ccall_GC
2150 - all the "main threads"
2152 ------------------------------------------------------------------------ */
2154 /* This has to be protected either by the scheduler monitor, or by the
2155 garbage collection monitor (probably the latter).
2160 GetRoots( evac_fn evac )
2165 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2166 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2167 evac((StgClosure **)&run_queue_hds[i]);
2168 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2169 evac((StgClosure **)&run_queue_tls[i]);
2171 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2172 evac((StgClosure **)&blocked_queue_hds[i]);
2173 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2174 evac((StgClosure **)&blocked_queue_tls[i]);
2175 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2176 evac((StgClosure **)&ccalling_threads[i]);
2183 if (run_queue_hd != END_TSO_QUEUE) {
2184 ASSERT(run_queue_tl != END_TSO_QUEUE);
2185 evac((StgClosure **)&run_queue_hd);
2186 evac((StgClosure **)&run_queue_tl);
2189 if (blocked_queue_hd != END_TSO_QUEUE) {
2190 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2191 evac((StgClosure **)&blocked_queue_hd);
2192 evac((StgClosure **)&blocked_queue_tl);
2195 if (sleeping_queue != END_TSO_QUEUE) {
2196 evac((StgClosure **)&sleeping_queue);
2200 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2201 evac((StgClosure **)&suspended_ccalling_threads);
2204 #if defined(PAR) || defined(GRAN)
2205 markSparkQueue(evac);
2208 #if defined(RTS_USER_SIGNALS)
2209 // mark the signal handlers (signals should be already blocked)
2210 markSignalHandlers(evac);
2214 /* -----------------------------------------------------------------------------
2217 This is the interface to the garbage collector from Haskell land.
2218 We provide this so that external C code can allocate and garbage
2219 collect when called from Haskell via _ccall_GC.
2221 It might be useful to provide an interface whereby the programmer
2222 can specify more roots (ToDo).
2224 This needs to be protected by the GC condition variable above. KH.
2225 -------------------------------------------------------------------------- */
2227 static void (*extra_roots)(evac_fn);
2232 /* Obligated to hold this lock upon entry */
2233 ACQUIRE_LOCK(&sched_mutex);
2234 GarbageCollect(GetRoots,rtsFalse);
2235 RELEASE_LOCK(&sched_mutex);
2239 performMajorGC(void)
2241 ACQUIRE_LOCK(&sched_mutex);
2242 GarbageCollect(GetRoots,rtsTrue);
2243 RELEASE_LOCK(&sched_mutex);
2247 AllRoots(evac_fn evac)
2249 GetRoots(evac); // the scheduler's roots
2250 extra_roots(evac); // the user's roots
2254 performGCWithRoots(void (*get_roots)(evac_fn))
2256 ACQUIRE_LOCK(&sched_mutex);
2257 extra_roots = get_roots;
2258 GarbageCollect(AllRoots,rtsFalse);
2259 RELEASE_LOCK(&sched_mutex);
2262 /* -----------------------------------------------------------------------------
2265 If the thread has reached its maximum stack size, then raise the
2266 StackOverflow exception in the offending thread. Otherwise
2267 relocate the TSO into a larger chunk of memory and adjust its stack
2269 -------------------------------------------------------------------------- */
2272 threadStackOverflow(StgTSO *tso)
2274 nat new_stack_size, new_tso_size, stack_words;
2278 IF_DEBUG(sanity,checkTSO(tso));
2279 if (tso->stack_size >= tso->max_stack_size) {
2282 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2283 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2284 /* If we're debugging, just print out the top of the stack */
2285 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2288 /* Send this thread the StackOverflow exception */
2289 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2293 /* Try to double the current stack size. If that takes us over the
2294 * maximum stack size for this thread, then use the maximum instead.
2295 * Finally round up so the TSO ends up as a whole number of blocks.
2297 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2298 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2299 TSO_STRUCT_SIZE)/sizeof(W_);
2300 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2301 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2303 IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2305 dest = (StgTSO *)allocate(new_tso_size);
2306 TICK_ALLOC_TSO(new_stack_size,0);
2308 /* copy the TSO block and the old stack into the new area */
2309 memcpy(dest,tso,TSO_STRUCT_SIZE);
2310 stack_words = tso->stack + tso->stack_size - tso->sp;
2311 new_sp = (P_)dest + new_tso_size - stack_words;
2312 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2314 /* relocate the stack pointers... */
2316 dest->stack_size = new_stack_size;
2318 /* Mark the old TSO as relocated. We have to check for relocated
2319 * TSOs in the garbage collector and any primops that deal with TSOs.
2321 * It's important to set the sp value to just beyond the end
2322 * of the stack, so we don't attempt to scavenge any part of the
2325 tso->what_next = ThreadRelocated;
2327 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2328 tso->why_blocked = NotBlocked;
2329 dest->mut_link = NULL;
2331 IF_PAR_DEBUG(verbose,
2332 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2333 tso->id, tso, tso->stack_size);
2334 /* If we're debugging, just print out the top of the stack */
2335 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2338 IF_DEBUG(sanity,checkTSO(tso));
2340 IF_DEBUG(scheduler,printTSO(dest));
2346 /* ---------------------------------------------------------------------------
2347 Wake up a queue that was blocked on some resource.
2348 ------------------------------------------------------------------------ */
2352 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2357 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2359 /* write RESUME events to log file and
2360 update blocked and fetch time (depending on type of the orig closure) */
2361 if (RtsFlags.ParFlags.ParStats.Full) {
2362 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2363 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2364 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2365 if (EMPTY_RUN_QUEUE())
2366 emitSchedule = rtsTrue;
2368 switch (get_itbl(node)->type) {
2370 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2375 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2382 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2389 static StgBlockingQueueElement *
2390 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2393 PEs node_loc, tso_loc;
2395 node_loc = where_is(node); // should be lifted out of loop
2396 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2397 tso_loc = where_is((StgClosure *)tso);
2398 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2399 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2400 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2401 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2402 // insertThread(tso, node_loc);
2403 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2405 tso, node, (rtsSpark*)NULL);
2406 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2409 } else { // TSO is remote (actually should be FMBQ)
2410 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2411 RtsFlags.GranFlags.Costs.gunblocktime +
2412 RtsFlags.GranFlags.Costs.latency;
2413 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2415 tso, node, (rtsSpark*)NULL);
2416 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2419 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2421 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2422 (node_loc==tso_loc ? "Local" : "Global"),
2423 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2424 tso->block_info.closure = NULL;
2425 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
2429 static StgBlockingQueueElement *
2430 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2432 StgBlockingQueueElement *next;
2434 switch (get_itbl(bqe)->type) {
2436 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2437 /* if it's a TSO just push it onto the run_queue */
2439 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2440 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
2442 unblockCount(bqe, node);
2443 /* reset blocking status after dumping event */
2444 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2448 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2450 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2451 PendingFetches = (StgBlockedFetch *)bqe;
2455 /* can ignore this case in a non-debugging setup;
2456 see comments on RBHSave closures above */
2458 /* check that the closure is an RBHSave closure */
2459 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2460 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2461 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2465 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2466 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2470 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2474 #else /* !GRAN && !PAR */
2476 unblockOneLocked(StgTSO *tso)
2480 ASSERT(get_itbl(tso)->type == TSO);
2481 ASSERT(tso->why_blocked != NotBlocked);
2482 tso->why_blocked = NotBlocked;
2484 tso->link = END_TSO_QUEUE;
2485 APPEND_TO_RUN_QUEUE(tso);
2487 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2492 #if defined(GRAN) || defined(PAR)
2493 INLINE_ME StgBlockingQueueElement *
2494 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2496 ACQUIRE_LOCK(&sched_mutex);
2497 bqe = unblockOneLocked(bqe, node);
2498 RELEASE_LOCK(&sched_mutex);
2503 unblockOne(StgTSO *tso)
2505 ACQUIRE_LOCK(&sched_mutex);
2506 tso = unblockOneLocked(tso);
2507 RELEASE_LOCK(&sched_mutex);
2514 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2516 StgBlockingQueueElement *bqe;
2521 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2522 node, CurrentProc, CurrentTime[CurrentProc],
2523 CurrentTSO->id, CurrentTSO));
2525 node_loc = where_is(node);
2527 ASSERT(q == END_BQ_QUEUE ||
2528 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2529 get_itbl(q)->type == CONSTR); // closure (type constructor)
2530 ASSERT(is_unique(node));
2532 /* FAKE FETCH: magically copy the node to the tso's proc;
2533 no Fetch necessary because in reality the node should not have been
2534 moved to the other PE in the first place
2536 if (CurrentProc!=node_loc) {
2538 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2539 node, node_loc, CurrentProc, CurrentTSO->id,
2540 // CurrentTSO, where_is(CurrentTSO),
2541 node->header.gran.procs));
2542 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2544 debugBelch("## new bitmask of node %p is %#x\n",
2545 node, node->header.gran.procs));
2546 if (RtsFlags.GranFlags.GranSimStats.Global) {
2547 globalGranStats.tot_fake_fetches++;
2552 // ToDo: check: ASSERT(CurrentProc==node_loc);
2553 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2556 bqe points to the current element in the queue
2557 next points to the next element in the queue
2559 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2560 //tso_loc = where_is(tso);
2562 bqe = unblockOneLocked(bqe, node);
2565 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2566 the closure to make room for the anchor of the BQ */
2567 if (bqe!=END_BQ_QUEUE) {
2568 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2570 ASSERT((info_ptr==&RBH_Save_0_info) ||
2571 (info_ptr==&RBH_Save_1_info) ||
2572 (info_ptr==&RBH_Save_2_info));
2574 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2575 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2576 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2579 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2580 node, info_type(node)));
2583 /* statistics gathering */
2584 if (RtsFlags.GranFlags.GranSimStats.Global) {
2585 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2586 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2587 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2588 globalGranStats.tot_awbq++; // total no. of bqs awakened
2591 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2592 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2596 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2598 StgBlockingQueueElement *bqe;
2600 ACQUIRE_LOCK(&sched_mutex);
2602 IF_PAR_DEBUG(verbose,
2603 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2607 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2608 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2613 ASSERT(q == END_BQ_QUEUE ||
2614 get_itbl(q)->type == TSO ||
2615 get_itbl(q)->type == BLOCKED_FETCH ||
2616 get_itbl(q)->type == CONSTR);
2619 while (get_itbl(bqe)->type==TSO ||
2620 get_itbl(bqe)->type==BLOCKED_FETCH) {
2621 bqe = unblockOneLocked(bqe, node);
2623 RELEASE_LOCK(&sched_mutex);
2626 #else /* !GRAN && !PAR */
2629 awakenBlockedQueueNoLock(StgTSO *tso)
2631 while (tso != END_TSO_QUEUE) {
2632 tso = unblockOneLocked(tso);
2637 awakenBlockedQueue(StgTSO *tso)
2639 ACQUIRE_LOCK(&sched_mutex);
2640 while (tso != END_TSO_QUEUE) {
2641 tso = unblockOneLocked(tso);
2643 RELEASE_LOCK(&sched_mutex);
2647 /* ---------------------------------------------------------------------------
2649 - usually called inside a signal handler so it mustn't do anything fancy.
2650 ------------------------------------------------------------------------ */
2653 interruptStgRts(void)
2659 /* -----------------------------------------------------------------------------
2662 This is for use when we raise an exception in another thread, which
2664 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2665 -------------------------------------------------------------------------- */
2667 #if defined(GRAN) || defined(PAR)
2669 NB: only the type of the blocking queue is different in GranSim and GUM
2670 the operations on the queue-elements are the same
2671 long live polymorphism!
2673 Locks: sched_mutex is held upon entry and exit.
2677 unblockThread(StgTSO *tso)
2679 StgBlockingQueueElement *t, **last;
2681 switch (tso->why_blocked) {
2684 return; /* not blocked */
2687 // Be careful: nothing to do here! We tell the scheduler that the thread
2688 // is runnable and we leave it to the stack-walking code to abort the
2689 // transaction while unwinding the stack. We should perhaps have a debugging
2690 // test to make sure that this really happens and that the 'zombie' transaction
2691 // does not get committed.
2695 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2697 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2698 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2700 last = (StgBlockingQueueElement **)&mvar->head;
2701 for (t = (StgBlockingQueueElement *)mvar->head;
2703 last = &t->link, last_tso = t, t = t->link) {
2704 if (t == (StgBlockingQueueElement *)tso) {
2705 *last = (StgBlockingQueueElement *)tso->link;
2706 if (mvar->tail == tso) {
2707 mvar->tail = (StgTSO *)last_tso;
2712 barf("unblockThread (MVAR): TSO not found");
2715 case BlockedOnBlackHole:
2716 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2718 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2720 last = &bq->blocking_queue;
2721 for (t = bq->blocking_queue;
2723 last = &t->link, t = t->link) {
2724 if (t == (StgBlockingQueueElement *)tso) {
2725 *last = (StgBlockingQueueElement *)tso->link;
2729 barf("unblockThread (BLACKHOLE): TSO not found");
2732 case BlockedOnException:
2734 StgTSO *target = tso->block_info.tso;
2736 ASSERT(get_itbl(target)->type == TSO);
2738 if (target->what_next == ThreadRelocated) {
2739 target = target->link;
2740 ASSERT(get_itbl(target)->type == TSO);
2743 ASSERT(target->blocked_exceptions != NULL);
2745 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2746 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2748 last = &t->link, t = t->link) {
2749 ASSERT(get_itbl(t)->type == TSO);
2750 if (t == (StgBlockingQueueElement *)tso) {
2751 *last = (StgBlockingQueueElement *)tso->link;
2755 barf("unblockThread (Exception): TSO not found");
2759 case BlockedOnWrite:
2760 #if defined(mingw32_TARGET_OS)
2761 case BlockedOnDoProc:
2764 /* take TSO off blocked_queue */
2765 StgBlockingQueueElement *prev = NULL;
2766 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2767 prev = t, t = t->link) {
2768 if (t == (StgBlockingQueueElement *)tso) {
2770 blocked_queue_hd = (StgTSO *)t->link;
2771 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2772 blocked_queue_tl = END_TSO_QUEUE;
2775 prev->link = t->link;
2776 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2777 blocked_queue_tl = (StgTSO *)prev;
2783 barf("unblockThread (I/O): TSO not found");
2786 case BlockedOnDelay:
2788 /* take TSO off sleeping_queue */
2789 StgBlockingQueueElement *prev = NULL;
2790 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2791 prev = t, t = t->link) {
2792 if (t == (StgBlockingQueueElement *)tso) {
2794 sleeping_queue = (StgTSO *)t->link;
2796 prev->link = t->link;
2801 barf("unblockThread (delay): TSO not found");
2805 barf("unblockThread");
2809 tso->link = END_TSO_QUEUE;
2810 tso->why_blocked = NotBlocked;
2811 tso->block_info.closure = NULL;
2812 PUSH_ON_RUN_QUEUE(tso);
2816 unblockThread(StgTSO *tso)
2820 /* To avoid locking unnecessarily. */
2821 if (tso->why_blocked == NotBlocked) {
2825 switch (tso->why_blocked) {
2828 // Be careful: nothing to do here! We tell the scheduler that the thread
2829 // is runnable and we leave it to the stack-walking code to abort the
2830 // transaction while unwinding the stack. We should perhaps have a debugging
2831 // test to make sure that this really happens and that the 'zombie' transaction
2832 // does not get committed.
2836 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2838 StgTSO *last_tso = END_TSO_QUEUE;
2839 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2842 for (t = mvar->head; t != END_TSO_QUEUE;
2843 last = &t->link, last_tso = t, t = t->link) {
2846 if (mvar->tail == tso) {
2847 mvar->tail = last_tso;
2852 barf("unblockThread (MVAR): TSO not found");
2855 case BlockedOnBlackHole:
2856 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2858 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2860 last = &bq->blocking_queue;
2861 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2862 last = &t->link, t = t->link) {
2868 barf("unblockThread (BLACKHOLE): TSO not found");
2871 case BlockedOnException:
2873 StgTSO *target = tso->block_info.tso;
2875 ASSERT(get_itbl(target)->type == TSO);
2877 while (target->what_next == ThreadRelocated) {
2878 target = target->link;
2879 ASSERT(get_itbl(target)->type == TSO);
2882 ASSERT(target->blocked_exceptions != NULL);
2884 last = &target->blocked_exceptions;
2885 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2886 last = &t->link, t = t->link) {
2887 ASSERT(get_itbl(t)->type == TSO);
2893 barf("unblockThread (Exception): TSO not found");
2897 case BlockedOnWrite:
2898 #if defined(mingw32_TARGET_OS)
2899 case BlockedOnDoProc:
2902 StgTSO *prev = NULL;
2903 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2904 prev = t, t = t->link) {
2907 blocked_queue_hd = t->link;
2908 if (blocked_queue_tl == t) {
2909 blocked_queue_tl = END_TSO_QUEUE;
2912 prev->link = t->link;
2913 if (blocked_queue_tl == t) {
2914 blocked_queue_tl = prev;
2920 barf("unblockThread (I/O): TSO not found");
2923 case BlockedOnDelay:
2925 StgTSO *prev = NULL;
2926 for (t = sleeping_queue; t != END_TSO_QUEUE;
2927 prev = t, t = t->link) {
2930 sleeping_queue = t->link;
2932 prev->link = t->link;
2937 barf("unblockThread (delay): TSO not found");
2941 barf("unblockThread");
2945 tso->link = END_TSO_QUEUE;
2946 tso->why_blocked = NotBlocked;
2947 tso->block_info.closure = NULL;
2948 APPEND_TO_RUN_QUEUE(tso);
2952 /* -----------------------------------------------------------------------------
2955 * The following function implements the magic for raising an
2956 * asynchronous exception in an existing thread.
2958 * We first remove the thread from any queue on which it might be
2959 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2961 * We strip the stack down to the innermost CATCH_FRAME, building
2962 * thunks in the heap for all the active computations, so they can
2963 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2964 * an application of the handler to the exception, and push it on
2965 * the top of the stack.
2967 * How exactly do we save all the active computations? We create an
2968 * AP_STACK for every UpdateFrame on the stack. Entering one of these
2969 * AP_STACKs pushes everything from the corresponding update frame
2970 * upwards onto the stack. (Actually, it pushes everything up to the
2971 * next update frame plus a pointer to the next AP_STACK object.
2972 * Entering the next AP_STACK object pushes more onto the stack until we
2973 * reach the last AP_STACK object - at which point the stack should look
2974 * exactly as it did when we killed the TSO and we can continue
2975 * execution by entering the closure on top of the stack.
2977 * We can also kill a thread entirely - this happens if either (a) the
2978 * exception passed to raiseAsync is NULL, or (b) there's no
2979 * CATCH_FRAME on the stack. In either case, we strip the entire
2980 * stack and replace the thread with a zombie.
2982 * Locks: sched_mutex held upon entry nor exit.
2984 * -------------------------------------------------------------------------- */
2987 deleteThread(StgTSO *tso)
2989 if (tso->why_blocked != BlockedOnCCall &&
2990 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2991 raiseAsync(tso,NULL);
2995 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2997 deleteThreadImmediately(StgTSO *tso)
2998 { // for forkProcess only:
2999 // delete thread without giving it a chance to catch the KillThread exception
3001 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3005 if (tso->why_blocked != BlockedOnCCall &&
3006 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3010 tso->what_next = ThreadKilled;
3015 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3017 /* When raising async exs from contexts where sched_mutex isn't held;
3018 use raiseAsyncWithLock(). */
3019 ACQUIRE_LOCK(&sched_mutex);
3020 raiseAsync(tso,exception);
3021 RELEASE_LOCK(&sched_mutex);
3025 raiseAsync(StgTSO *tso, StgClosure *exception)
3027 raiseAsync_(tso, exception, rtsFalse);
3031 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3033 StgRetInfoTable *info;
3036 // Thread already dead?
3037 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3042 sched_belch("raising exception in thread %ld.", (long)tso->id));
3044 // Remove it from any blocking queues
3049 // The stack freezing code assumes there's a closure pointer on
3050 // the top of the stack, so we have to arrange that this is the case...
3052 if (sp[0] == (W_)&stg_enter_info) {
3056 sp[0] = (W_)&stg_dummy_ret_closure;
3062 // 1. Let the top of the stack be the "current closure"
3064 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3067 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3068 // current closure applied to the chunk of stack up to (but not
3069 // including) the update frame. This closure becomes the "current
3070 // closure". Go back to step 2.
3072 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3073 // top of the stack applied to the exception.
3075 // 5. If it's a STOP_FRAME, then kill the thread.
3077 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
3084 info = get_ret_itbl((StgClosure *)frame);
3086 while (info->i.type != UPDATE_FRAME
3087 && (info->i.type != CATCH_FRAME || exception == NULL)
3088 && info->i.type != STOP_FRAME
3089 && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3091 if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3092 // IF we find an ATOMICALLY_FRAME then we abort the
3093 // current transaction and propagate the exception. In
3094 // this case (unlike ordinary exceptions) we do not care
3095 // whether the transaction is valid or not because its
3096 // possible validity cannot have caused the exception
3097 // and will not be visible after the abort.
3099 debugBelch("Found atomically block delivering async exception\n"));
3100 stmAbortTransaction(tso -> trec);
3101 tso -> trec = stmGetEnclosingTRec(tso -> trec);
3103 frame += stack_frame_sizeW((StgClosure *)frame);
3104 info = get_ret_itbl((StgClosure *)frame);
3107 switch (info->i.type) {
3109 case ATOMICALLY_FRAME:
3110 ASSERT(stop_at_atomically);
3111 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3112 stmCondemnTransaction(tso -> trec);
3114 tso->what_next = ThreadRunGHC;
3118 // If we find a CATCH_FRAME, and we've got an exception to raise,
3119 // then build the THUNK raise(exception), and leave it on
3120 // top of the CATCH_FRAME ready to enter.
3124 StgCatchFrame *cf = (StgCatchFrame *)frame;
3128 // we've got an exception to raise, so let's pass it to the
3129 // handler in this frame.
3131 raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3132 TICK_ALLOC_SE_THK(1,0);
3133 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3134 raise->payload[0] = exception;
3136 // throw away the stack from Sp up to the CATCH_FRAME.
3140 /* Ensure that async excpetions are blocked now, so we don't get
3141 * a surprise exception before we get around to executing the
3144 if (tso->blocked_exceptions == NULL) {
3145 tso->blocked_exceptions = END_TSO_QUEUE;
3148 /* Put the newly-built THUNK on top of the stack, ready to execute
3149 * when the thread restarts.
3152 sp[-1] = (W_)&stg_enter_info;
3154 tso->what_next = ThreadRunGHC;
3155 IF_DEBUG(sanity, checkTSO(tso));
3164 // First build an AP_STACK consisting of the stack chunk above the
3165 // current update frame, with the top word on the stack as the
3168 words = frame - sp - 1;
3169 ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3172 ap->fun = (StgClosure *)sp[0];
3174 for(i=0; i < (nat)words; ++i) {
3175 ap->payload[i] = (StgClosure *)*sp++;
3178 SET_HDR(ap,&stg_AP_STACK_info,
3179 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3180 TICK_ALLOC_UP_THK(words+1,0);
3183 debugBelch("sched: Updating ");
3184 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3185 debugBelch(" with ");
3186 printObj((StgClosure *)ap);
3189 // Replace the updatee with an indirection - happily
3190 // this will also wake up any threads currently
3191 // waiting on the result.
3193 // Warning: if we're in a loop, more than one update frame on
3194 // the stack may point to the same object. Be careful not to
3195 // overwrite an IND_OLDGEN in this case, because we'll screw
3196 // up the mutable lists. To be on the safe side, don't
3197 // overwrite any kind of indirection at all. See also
3198 // threadSqueezeStack in GC.c, where we have to make a similar
3201 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3202 // revert the black hole
3203 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3206 sp += sizeofW(StgUpdateFrame) - 1;
3207 sp[0] = (W_)ap; // push onto stack
3212 // We've stripped the entire stack, the thread is now dead.
3213 sp += sizeofW(StgStopFrame);
3214 tso->what_next = ThreadKilled;
3225 /* -----------------------------------------------------------------------------
3226 raiseExceptionHelper
3228 This function is called by the raise# primitve, just so that we can
3229 move some of the tricky bits of raising an exception from C-- into
3230 C. Who knows, it might be a useful re-useable thing here too.
3231 -------------------------------------------------------------------------- */
3234 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3236 StgClosure *raise_closure = NULL;
3238 StgRetInfoTable *info;
3240 // This closure represents the expression 'raise# E' where E
3241 // is the exception raise. It is used to overwrite all the
3242 // thunks which are currently under evaluataion.
3246 // LDV profiling: stg_raise_info has THUNK as its closure
3247 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3248 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3249 // 1 does not cause any problem unless profiling is performed.
3250 // However, when LDV profiling goes on, we need to linearly scan
3251 // small object pool, where raise_closure is stored, so we should
3252 // use MIN_UPD_SIZE.
3254 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3255 // sizeofW(StgClosure)+1);
3259 // Walk up the stack, looking for the catch frame. On the way,
3260 // we update any closures pointed to from update frames with the
3261 // raise closure that we just built.
3265 info = get_ret_itbl((StgClosure *)p);
3266 next = p + stack_frame_sizeW((StgClosure *)p);
3267 switch (info->i.type) {
3270 // Only create raise_closure if we need to.
3271 if (raise_closure == NULL) {
3273 (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3274 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3275 raise_closure->payload[0] = exception;
3277 UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3281 case ATOMICALLY_FRAME:
3282 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3284 return ATOMICALLY_FRAME;
3290 case CATCH_STM_FRAME:
3291 IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3293 return CATCH_STM_FRAME;
3299 case CATCH_RETRY_FRAME:
3308 /* -----------------------------------------------------------------------------
3309 findRetryFrameHelper
3311 This function is called by the retry# primitive. It traverses the stack
3312 leaving tso->sp referring to the frame which should handle the retry.
3314 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3315 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3317 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3318 despite the similar implementation.
3320 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3321 not be created within memory transactions.
3322 -------------------------------------------------------------------------- */
3325 findRetryFrameHelper (StgTSO *tso)
3328 StgRetInfoTable *info;
3332 info = get_ret_itbl((StgClosure *)p);
3333 next = p + stack_frame_sizeW((StgClosure *)p);
3334 switch (info->i.type) {
3336 case ATOMICALLY_FRAME:
3337 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3339 return ATOMICALLY_FRAME;
3341 case CATCH_RETRY_FRAME:
3342 IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3344 return CATCH_RETRY_FRAME;
3346 case CATCH_STM_FRAME:
3348 ASSERT(info->i.type != CATCH_FRAME);
3349 ASSERT(info->i.type != STOP_FRAME);
3356 /* -----------------------------------------------------------------------------
3357 resurrectThreads is called after garbage collection on the list of
3358 threads found to be garbage. Each of these threads will be woken
3359 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3360 on an MVar, or NonTermination if the thread was blocked on a Black
3363 Locks: sched_mutex isn't held upon entry nor exit.
3364 -------------------------------------------------------------------------- */
3367 resurrectThreads( StgTSO *threads )
3371 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3372 next = tso->global_link;
3373 tso->global_link = all_threads;
3375 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3377 switch (tso->why_blocked) {
3379 case BlockedOnException:
3380 /* Called by GC - sched_mutex lock is currently held. */
3381 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3383 case BlockedOnBlackHole:
3384 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3387 raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3390 /* This might happen if the thread was blocked on a black hole
3391 * belonging to a thread that we've just woken up (raiseAsync
3392 * can wake up threads, remember...).
3396 barf("resurrectThreads: thread blocked in a strange way");
3401 /* ----------------------------------------------------------------------------
3402 * Debugging: why is a thread blocked
3403 * [Also provides useful information when debugging threaded programs
3404 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3405 ------------------------------------------------------------------------- */
3409 printThreadBlockage(StgTSO *tso)
3411 switch (tso->why_blocked) {
3413 debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3415 case BlockedOnWrite:
3416 debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3418 #if defined(mingw32_TARGET_OS)
3419 case BlockedOnDoProc:
3420 debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3423 case BlockedOnDelay:
3424 debugBelch("is blocked until %d", tso->block_info.target);
3427 debugBelch("is blocked on an MVar");
3429 case BlockedOnException:
3430 debugBelch("is blocked on delivering an exception to thread %d",
3431 tso->block_info.tso->id);
3433 case BlockedOnBlackHole:
3434 debugBelch("is blocked on a black hole");
3437 debugBelch("is not blocked");
3441 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3442 tso->block_info.closure, info_type(tso->block_info.closure));
3444 case BlockedOnGA_NoSend:
3445 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3446 tso->block_info.closure, info_type(tso->block_info.closure));
3449 case BlockedOnCCall:
3450 debugBelch("is blocked on an external call");
3452 case BlockedOnCCall_NoUnblockExc:
3453 debugBelch("is blocked on an external call (exceptions were already blocked)");
3456 debugBelch("is blocked on an STM operation");
3459 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3460 tso->why_blocked, tso->id, tso);
3466 printThreadStatus(StgTSO *tso)
3468 switch (tso->what_next) {
3470 debugBelch("has been killed");
3472 case ThreadComplete:
3473 debugBelch("has completed");
3476 printThreadBlockage(tso);
3481 printAllThreads(void)
3486 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3487 ullong_format_string(TIME_ON_PROC(CurrentProc),
3488 time_string, rtsFalse/*no commas!*/);
3490 debugBelch("all threads at [%s]:\n", time_string);
3492 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3493 ullong_format_string(CURRENT_TIME,
3494 time_string, rtsFalse/*no commas!*/);
3496 debugBelch("all threads at [%s]:\n", time_string);
3498 debugBelch("all threads:\n");
3501 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3502 debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3505 void *label = lookupThreadLabel(t->id);
3506 if (label) debugBelch("[\"%s\"] ",(char *)label);
3509 printThreadStatus(t);
3517 Print a whole blocking queue attached to node (debugging only).
3521 print_bq (StgClosure *node)
3523 StgBlockingQueueElement *bqe;
3527 debugBelch("## BQ of closure %p (%s): ",
3528 node, info_type(node));
3530 /* should cover all closures that may have a blocking queue */
3531 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3532 get_itbl(node)->type == FETCH_ME_BQ ||
3533 get_itbl(node)->type == RBH ||
3534 get_itbl(node)->type == MVAR);
3536 ASSERT(node!=(StgClosure*)NULL); // sanity check
3538 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3542 Print a whole blocking queue starting with the element bqe.
3545 print_bqe (StgBlockingQueueElement *bqe)
3550 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3552 for (end = (bqe==END_BQ_QUEUE);
3553 !end; // iterate until bqe points to a CONSTR
3554 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3555 bqe = end ? END_BQ_QUEUE : bqe->link) {
3556 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3557 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3558 /* types of closures that may appear in a blocking queue */
3559 ASSERT(get_itbl(bqe)->type == TSO ||
3560 get_itbl(bqe)->type == BLOCKED_FETCH ||
3561 get_itbl(bqe)->type == CONSTR);
3562 /* only BQs of an RBH end with an RBH_Save closure */
3563 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3565 switch (get_itbl(bqe)->type) {
3567 debugBelch(" TSO %u (%x),",
3568 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3571 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3572 ((StgBlockedFetch *)bqe)->node,
3573 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3574 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3575 ((StgBlockedFetch *)bqe)->ga.weight);
3578 debugBelch(" %s (IP %p),",
3579 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3580 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3581 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3582 "RBH_Save_?"), get_itbl(bqe));
3585 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3586 info_type((StgClosure *)bqe)); // , node, info_type(node));
3592 # elif defined(GRAN)
3594 print_bq (StgClosure *node)
3596 StgBlockingQueueElement *bqe;
3597 PEs node_loc, tso_loc;
3600 /* should cover all closures that may have a blocking queue */
3601 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3602 get_itbl(node)->type == FETCH_ME_BQ ||
3603 get_itbl(node)->type == RBH);
3605 ASSERT(node!=(StgClosure*)NULL); // sanity check
3606 node_loc = where_is(node);
3608 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3609 node, info_type(node), node_loc);
3612 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3614 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3615 !end; // iterate until bqe points to a CONSTR
3616 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3617 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3618 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3619 /* types of closures that may appear in a blocking queue */
3620 ASSERT(get_itbl(bqe)->type == TSO ||
3621 get_itbl(bqe)->type == CONSTR);
3622 /* only BQs of an RBH end with an RBH_Save closure */
3623 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3625 tso_loc = where_is((StgClosure *)bqe);
3626 switch (get_itbl(bqe)->type) {
3628 debugBelch(" TSO %d (%p) on [PE %d],",
3629 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3632 debugBelch(" %s (IP %p),",
3633 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3634 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3635 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3636 "RBH_Save_?"), get_itbl(bqe));
3639 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3640 info_type((StgClosure *)bqe), node, info_type(node));
3648 Nice and easy: only TSOs on the blocking queue
3651 print_bq (StgClosure *node)
3655 ASSERT(node!=(StgClosure*)NULL); // sanity check
3656 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3657 tso != END_TSO_QUEUE;
3659 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3660 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3661 debugBelch(" TSO %d (%p),", tso->id, tso);
3674 for (i=0, tso=run_queue_hd;
3675 tso != END_TSO_QUEUE;
3684 sched_belch(char *s, ...)
3688 #ifdef RTS_SUPPORTS_THREADS
3689 debugBelch("sched (task %p): ", osThreadId());
3693 debugBelch("sched: ");