1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2004
7 * Different GHC ways use this scheduler quite differently (see comments below)
8 * Here is the global picture:
10 * WAY Name CPP flag What's it for
11 * --------------------------------------
12 * mp GUM PAR Parallel execution on a distrib. memory machine
13 * s SMP SMP Parallel execution on a shared memory machine
14 * mg GranSim GRAN Simulation of parallel execution
15 * md GUM/GdH DIST Distributed execution (based on GUM)
17 * --------------------------------------------------------------------------*/
20 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22 The main scheduling loop in GUM iterates until a finish message is received.
23 In that case a global flag @receivedFinish@ is set and this instance of
24 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25 for the handling of incoming messages, such as PP_FINISH.
26 Note that in the parallel case we have a system manager that coordinates
27 different PEs, each of which are running one instance of the RTS.
28 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33 The main scheduling code in GranSim is quite different from that in std
34 (concurrent) Haskell: while concurrent Haskell just iterates over the
35 threads in the runnable queue, GranSim is event driven, i.e. it iterates
36 over the events in the global event queue. -- HWL
39 #include "PosixSource.h"
44 #include "BlockAlloc.h"
48 #define COMPILING_SCHEDULER
50 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
65 #include "Proftimer.h"
68 #if defined(GRAN) || defined(PAR)
69 # include "GranSimRts.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
78 #include "Capability.h"
79 #include "OSThreads.h"
82 #ifdef HAVE_SYS_TYPES_H
83 #include <sys/types.h>
98 #define USED_IN_THREADED_RTS
100 #define USED_IN_THREADED_RTS STG_UNUSED
103 #ifdef RTS_SUPPORTS_THREADS
104 #define USED_WHEN_RTS_SUPPORTS_THREADS
106 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
109 /* Main thread queue.
110 * Locks required: sched_mutex.
112 StgMainThread *main_threads = NULL;
115 * Locks required: sched_mutex.
119 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
120 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
123 In GranSim we have a runnable and a blocked queue for each processor.
124 In order to minimise code changes new arrays run_queue_hds/tls
125 are created. run_queue_hd is then a short cut (macro) for
126 run_queue_hds[CurrentProc] (see GranSim.h).
129 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
130 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
131 StgTSO *ccalling_threadss[MAX_PROC];
132 /* We use the same global list of threads (all_threads) in GranSim as in
133 the std RTS (i.e. we are cheating). However, we don't use this list in
134 the GranSim specific code at the moment (so we are only potentially
139 StgTSO *run_queue_hd = NULL;
140 StgTSO *run_queue_tl = NULL;
141 StgTSO *blocked_queue_hd = NULL;
142 StgTSO *blocked_queue_tl = NULL;
143 StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */
147 /* Linked list of all threads.
148 * Used for detecting garbage collected threads.
150 StgTSO *all_threads = NULL;
152 /* When a thread performs a safe C call (_ccall_GC, using old
153 * terminology), it gets put on the suspended_ccalling_threads
154 * list. Used by the garbage collector.
156 static StgTSO *suspended_ccalling_threads;
158 static StgTSO *threadStackOverflow(StgTSO *tso);
160 /* KH: The following two flags are shared memory locations. There is no need
161 to lock them, since they are only unset at the end of a scheduler
165 /* flag set by signal handler to precipitate a context switch */
166 int context_switch = 0;
168 /* if this flag is set as well, give up execution */
169 rtsBool interrupted = rtsFalse;
171 /* If this flag is set, we are running Haskell code. Used to detect
172 * uses of 'foreign import unsafe' that should be 'safe'.
174 rtsBool in_haskell = rtsFalse;
176 /* Next thread ID to allocate.
177 * Locks required: thread_id_mutex
179 static StgThreadID next_thread_id = 1;
182 * Pointers to the state of the current thread.
183 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
184 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
187 /* The smallest stack size that makes any sense is:
188 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
189 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
190 * + 1 (the closure to enter)
192 * + 1 (spare slot req'd by stg_ap_v_ret)
194 * A thread with this stack will bomb immediately with a stack
195 * overflow, which will increase its stack size.
198 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
205 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
206 * exists - earlier gccs apparently didn't.
211 static rtsBool ready_to_gc;
214 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
215 * in an MT setting, needed to signal that a worker thread shouldn't hang around
216 * in the scheduler when it is out of work.
218 static rtsBool shutting_down_scheduler = rtsFalse;
220 void addToBlockedQueue ( StgTSO *tso );
222 static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
223 void interruptStgRts ( void );
225 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
226 static void detectBlackHoles ( void );
229 static void raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);
231 #if defined(RTS_SUPPORTS_THREADS)
232 /* ToDo: carefully document the invariants that go together
233 * with these synchronisation objects.
235 Mutex sched_mutex = INIT_MUTEX_VAR;
236 Mutex term_mutex = INIT_MUTEX_VAR;
238 #endif /* RTS_SUPPORTS_THREADS */
242 rtsTime TimeOfLastYield;
243 rtsBool emitSchedule = rtsTrue;
247 static char *whatNext_strs[] = {
258 StgTSO * createSparkThread(rtsSpark spark);
259 StgTSO * activateSpark (rtsSpark spark);
262 /* ----------------------------------------------------------------------------
264 * ------------------------------------------------------------------------- */
266 #if defined(RTS_SUPPORTS_THREADS)
267 static rtsBool startingWorkerThread = rtsFalse;
269 static void taskStart(void);
273 ACQUIRE_LOCK(&sched_mutex);
274 startingWorkerThread = rtsFalse;
276 RELEASE_LOCK(&sched_mutex);
280 startSchedulerTaskIfNecessary(void)
282 if(run_queue_hd != END_TSO_QUEUE
283 || blocked_queue_hd != END_TSO_QUEUE
284 || sleeping_queue != END_TSO_QUEUE)
286 if(!startingWorkerThread)
287 { // we don't want to start another worker thread
288 // just because the last one hasn't yet reached the
289 // "waiting for capability" state
290 startingWorkerThread = rtsTrue;
291 if(!startTask(taskStart))
293 startingWorkerThread = rtsFalse;
300 /* ---------------------------------------------------------------------------
301 Main scheduling loop.
303 We use round-robin scheduling, each thread returning to the
304 scheduler loop when one of these conditions is detected:
307 * timer expires (thread yields)
312 Locking notes: we acquire the scheduler lock once at the beginning
313 of the scheduler loop, and release it when
315 * running a thread, or
316 * waiting for work, or
317 * waiting for a GC to complete.
320 In a GranSim setup this loop iterates over the global event queue.
321 This revolves around the global event queue, which determines what
322 to do next. Therefore, it's more complicated than either the
323 concurrent or the parallel (GUM) setup.
326 GUM iterates over incoming messages.
327 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
328 and sends out a fish whenever it has nothing to do; in-between
329 doing the actual reductions (shared code below) it processes the
330 incoming messages and deals with delayed operations
331 (see PendingFetches).
332 This is not the ugliest code you could imagine, but it's bloody close.
334 ------------------------------------------------------------------------ */
336 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
337 Capability *initialCapability )
341 StgThreadReturnCode ret;
349 rtsBool receivedFinish = rtsFalse;
351 nat tp_size, sp_size; // stats only
354 rtsBool was_interrupted = rtsFalse;
357 // Pre-condition: sched_mutex is held.
358 // We might have a capability, passed in as initialCapability.
359 cap = initialCapability;
361 #if defined(RTS_SUPPORTS_THREADS)
363 // in the threaded case, the capability is either passed in via the
364 // initialCapability parameter, or initialized inside the scheduler
368 sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
369 mainThread, initialCapability);
372 // simply initialise it in the non-threaded case
373 grabCapability(&cap);
377 /* set up first event to get things going */
378 /* ToDo: assign costs for system setup and init MainTSO ! */
379 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
381 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
384 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
385 G_TSO(CurrentTSO, 5));
387 if (RtsFlags.GranFlags.Light) {
388 /* Save current time; GranSim Light only */
389 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
392 event = get_next_event();
394 while (event!=(rtsEvent*)NULL) {
395 /* Choose the processor with the next event */
396 CurrentProc = event->proc;
397 CurrentTSO = event->tso;
401 while (!receivedFinish) { /* set by processMessages */
402 /* when receiving PP_FINISH message */
404 #else // everything except GRAN and PAR
410 IF_DEBUG(scheduler, printAllThreads());
412 #if defined(RTS_SUPPORTS_THREADS)
413 // Yield the capability to higher-priority tasks if necessary.
416 yieldCapability(&cap);
419 // If we do not currently hold a capability, we wait for one
422 waitForCapability(&sched_mutex, &cap,
423 mainThread ? &mainThread->bound_thread_cond : NULL);
426 // We now have a capability...
429 // Check whether we have re-entered the RTS from Haskell without
430 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
433 errorBelch("schedule: re-entered unsafely.\n"
434 " Perhaps a 'foreign import unsafe' should be 'safe'?");
439 // If we're interrupted (the user pressed ^C, or some other
440 // termination condition occurred), kill all the currently running
444 IF_DEBUG(scheduler, sched_belch("interrupted"));
445 interrupted = rtsFalse;
446 was_interrupted = rtsTrue;
447 #if defined(RTS_SUPPORTS_THREADS)
448 // In the threaded RTS, deadlock detection doesn't work,
449 // so just exit right away.
450 errorBelch("interrupted");
451 releaseCapability(cap);
452 RELEASE_LOCK(&sched_mutex);
453 shutdownHaskellAndExit(EXIT_SUCCESS);
459 #if defined(RTS_USER_SIGNALS)
460 // check for signals each time around the scheduler
461 if (signals_pending()) {
462 RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
463 startSignalHandlers();
464 ACQUIRE_LOCK(&sched_mutex);
469 // Check whether any waiting threads need to be woken up. If the
470 // run queue is empty, and there are no other tasks running, we
471 // can wait indefinitely for something to happen.
473 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
475 #if defined(RTS_SUPPORTS_THREADS)
476 // We shouldn't be here...
477 barf("schedule: awaitEvent() in threaded RTS");
479 awaitEvent( EMPTY_RUN_QUEUE() );
481 // we can be interrupted while waiting for I/O...
482 if (interrupted) continue;
485 * Detect deadlock: when we have no threads to run, there are no
486 * threads waiting on I/O or sleeping, and all the other tasks are
487 * waiting for work, we must have a deadlock of some description.
489 * We first try to find threads blocked on themselves (ie. black
490 * holes), and generate NonTermination exceptions where necessary.
492 * If no threads are black holed, we have a deadlock situation, so
493 * inform all the main threads.
495 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
496 if ( EMPTY_THREAD_QUEUES() )
498 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
500 // Garbage collection can release some new threads due to
501 // either (a) finalizers or (b) threads resurrected because
502 // they are unreachable and will therefore be sent an
503 // exception. Any threads thus released will be immediately
505 GarbageCollect(GetRoots,rtsTrue);
506 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
508 #if defined(RTS_USER_SIGNALS)
509 /* If we have user-installed signal handlers, then wait
510 * for signals to arrive rather then bombing out with a
513 if ( anyUserHandlers() ) {
515 sched_belch("still deadlocked, waiting for signals..."));
519 // we might be interrupted...
520 if (interrupted) { continue; }
522 if (signals_pending()) {
523 RELEASE_LOCK(&sched_mutex);
524 startSignalHandlers();
525 ACQUIRE_LOCK(&sched_mutex);
527 ASSERT(!EMPTY_RUN_QUEUE());
532 /* Probably a real deadlock. Send the current main thread the
533 * Deadlock exception (or in the SMP build, send *all* main
534 * threads the deadlock exception, since none of them can make
540 switch (m->tso->why_blocked) {
541 case BlockedOnBlackHole:
542 case BlockedOnException:
544 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
547 barf("deadlock: main thread blocked in a strange way");
553 #elif defined(RTS_SUPPORTS_THREADS)
554 // ToDo: add deadlock detection in threaded RTS
556 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
559 #if defined(RTS_SUPPORTS_THREADS) || defined(mingw32_HOST_OS)
560 /* win32: might be back here due to awaitEvent() being abandoned
561 * as a result of a console event having been delivered.
563 if ( EMPTY_RUN_QUEUE() ) {
564 continue; // nothing to do
569 if (RtsFlags.GranFlags.Light)
570 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
572 /* adjust time based on time-stamp */
573 if (event->time > CurrentTime[CurrentProc] &&
574 event->evttype != ContinueThread)
575 CurrentTime[CurrentProc] = event->time;
577 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
578 if (!RtsFlags.GranFlags.Light)
581 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
583 /* main event dispatcher in GranSim */
584 switch (event->evttype) {
585 /* Should just be continuing execution */
587 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
588 /* ToDo: check assertion
589 ASSERT(run_queue_hd != (StgTSO*)NULL &&
590 run_queue_hd != END_TSO_QUEUE);
592 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
593 if (!RtsFlags.GranFlags.DoAsyncFetch &&
594 procStatus[CurrentProc]==Fetching) {
595 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
596 CurrentTSO->id, CurrentTSO, CurrentProc);
599 /* Ignore ContinueThreads for completed threads */
600 if (CurrentTSO->what_next == ThreadComplete) {
601 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
602 CurrentTSO->id, CurrentTSO, CurrentProc);
605 /* Ignore ContinueThreads for threads that are being migrated */
606 if (PROCS(CurrentTSO)==Nowhere) {
607 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
608 CurrentTSO->id, CurrentTSO, CurrentProc);
611 /* The thread should be at the beginning of the run queue */
612 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
613 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
614 CurrentTSO->id, CurrentTSO, CurrentProc);
615 break; // run the thread anyway
618 new_event(proc, proc, CurrentTime[proc],
620 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
622 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
623 break; // now actually run the thread; DaH Qu'vam yImuHbej
626 do_the_fetchnode(event);
627 goto next_thread; /* handle next event in event queue */
630 do_the_globalblock(event);
631 goto next_thread; /* handle next event in event queue */
634 do_the_fetchreply(event);
635 goto next_thread; /* handle next event in event queue */
637 case UnblockThread: /* Move from the blocked queue to the tail of */
638 do_the_unblock(event);
639 goto next_thread; /* handle next event in event queue */
641 case ResumeThread: /* Move from the blocked queue to the tail of */
642 /* the runnable queue ( i.e. Qu' SImqa'lu') */
643 event->tso->gran.blocktime +=
644 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
645 do_the_startthread(event);
646 goto next_thread; /* handle next event in event queue */
649 do_the_startthread(event);
650 goto next_thread; /* handle next event in event queue */
653 do_the_movethread(event);
654 goto next_thread; /* handle next event in event queue */
657 do_the_movespark(event);
658 goto next_thread; /* handle next event in event queue */
661 do_the_findwork(event);
662 goto next_thread; /* handle next event in event queue */
665 barf("Illegal event type %u\n", event->evttype);
668 /* This point was scheduler_loop in the old RTS */
670 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
672 TimeOfLastEvent = CurrentTime[CurrentProc];
673 TimeOfNextEvent = get_time_of_next_event();
674 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
675 // CurrentTSO = ThreadQueueHd;
677 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
680 if (RtsFlags.GranFlags.Light)
681 GranSimLight_leave_system(event, &ActiveTSO);
683 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
686 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
688 /* in a GranSim setup the TSO stays on the run queue */
690 /* Take a thread from the run queue. */
691 POP_RUN_QUEUE(t); // take_off_run_queue(t);
694 debugBelch("GRAN: About to run current thread, which is\n");
697 context_switch = 0; // turned on via GranYield, checking events and time slice
700 DumpGranEvent(GR_SCHEDULE, t));
702 procStatus[CurrentProc] = Busy;
705 if (PendingFetches != END_BF_QUEUE) {
709 /* ToDo: phps merge with spark activation above */
710 /* check whether we have local work and send requests if we have none */
711 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
712 /* :-[ no local threads => look out for local sparks */
713 /* the spark pool for the current PE */
714 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
715 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
716 pool->hd < pool->tl) {
718 * ToDo: add GC code check that we really have enough heap afterwards!!
720 * If we're here (no runnable threads) and we have pending
721 * sparks, we must have a space problem. Get enough space
722 * to turn one of those pending sparks into a
726 spark = findSpark(rtsFalse); /* get a spark */
727 if (spark != (rtsSpark) NULL) {
728 tso = activateSpark(spark); /* turn the spark into a thread */
729 IF_PAR_DEBUG(schedule,
730 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
731 tso->id, tso, advisory_thread_count));
733 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
734 debugBelch("==^^ failed to activate spark\n");
736 } /* otherwise fall through & pick-up new tso */
738 IF_PAR_DEBUG(verbose,
739 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
740 spark_queue_len(pool)));
745 /* If we still have no work we need to send a FISH to get a spark
748 if (EMPTY_RUN_QUEUE()) {
749 /* =8-[ no local sparks => look for work on other PEs */
751 * We really have absolutely no work. Send out a fish
752 * (there may be some out there already), and wait for
753 * something to arrive. We clearly can't run any threads
754 * until a SCHEDULE or RESUME arrives, and so that's what
755 * we're hoping to see. (Of course, we still have to
756 * respond to other types of messages.)
758 TIME now = msTime() /*CURRENT_TIME*/;
759 IF_PAR_DEBUG(verbose,
760 debugBelch("-- now=%ld\n", now));
761 IF_PAR_DEBUG(verbose,
762 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
763 (last_fish_arrived_at!=0 &&
764 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
765 debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
766 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
767 last_fish_arrived_at,
768 RtsFlags.ParFlags.fishDelay, now);
771 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
772 (last_fish_arrived_at==0 ||
773 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
774 /* outstandingFishes is set in sendFish, processFish;
775 avoid flooding system with fishes via delay */
777 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
780 // Global statistics: count no. of fishes
781 if (RtsFlags.ParFlags.ParStats.Global &&
782 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
783 globalParStats.tot_fish_mess++;
787 receivedFinish = processMessages();
790 } else if (PacketsWaiting()) { /* Look for incoming messages */
791 receivedFinish = processMessages();
794 /* Now we are sure that we have some work available */
795 ASSERT(run_queue_hd != END_TSO_QUEUE);
797 /* Take a thread from the run queue, if we have work */
798 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
799 IF_DEBUG(sanity,checkTSO(t));
801 /* ToDo: write something to the log-file
802 if (RTSflags.ParFlags.granSimStats && !sameThread)
803 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
807 /* the spark pool for the current PE */
808 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
811 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
812 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
815 if (0 && RtsFlags.ParFlags.ParStats.Full &&
816 t && LastTSO && t->id != LastTSO->id &&
817 LastTSO->why_blocked == NotBlocked &&
818 LastTSO->what_next != ThreadComplete) {
819 // if previously scheduled TSO not blocked we have to record the context switch
820 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
821 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
824 if (RtsFlags.ParFlags.ParStats.Full &&
825 (emitSchedule /* forced emit */ ||
826 (t && LastTSO && t->id != LastTSO->id))) {
828 we are running a different TSO, so write a schedule event to log file
829 NB: If we use fair scheduling we also have to write a deschedule
830 event for LastTSO; with unfair scheduling we know that the
831 previous tso has blocked whenever we switch to another tso, so
832 we don't need it in GUM for now
834 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
835 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
836 emitSchedule = rtsFalse;
840 #else /* !GRAN && !PAR */
842 // grab a thread from the run queue
843 ASSERT(run_queue_hd != END_TSO_QUEUE);
846 // Sanity check the thread we're about to run. This can be
847 // expensive if there is lots of thread switching going on...
848 IF_DEBUG(sanity,checkTSO(t));
853 StgMainThread *m = t->main;
860 sched_belch("### Running thread %d in bound thread", t->id));
861 // yes, the Haskell thread is bound to the current native thread
866 sched_belch("### thread %d bound to another OS thread", t->id));
867 // no, bound to a different Haskell thread: pass to that thread
868 PUSH_ON_RUN_QUEUE(t);
869 passCapability(&m->bound_thread_cond);
875 if(mainThread != NULL)
876 // The thread we want to run is bound.
879 sched_belch("### this OS thread cannot run thread %d", t->id));
880 // no, the current native thread is bound to a different
881 // Haskell thread, so pass it to any worker thread
882 PUSH_ON_RUN_QUEUE(t);
883 passCapabilityToWorker();
890 cap->r.rCurrentTSO = t;
892 /* context switches are now initiated by the timer signal, unless
893 * the user specified "context switch as often as possible", with
896 if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
897 && (run_queue_hd != END_TSO_QUEUE
898 || blocked_queue_hd != END_TSO_QUEUE
899 || sleeping_queue != END_TSO_QUEUE)))
904 RELEASE_LOCK(&sched_mutex);
906 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
907 (long)t->id, whatNext_strs[t->what_next]));
910 startHeapProfTimer();
913 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
914 /* Run the current thread
916 prev_what_next = t->what_next;
918 errno = t->saved_errno;
919 in_haskell = rtsTrue;
921 switch (prev_what_next) {
925 /* Thread already finished, return to scheduler. */
926 ret = ThreadFinished;
930 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
933 case ThreadInterpret:
934 ret = interpretBCO(cap);
938 barf("schedule: invalid what_next field");
941 in_haskell = rtsFalse;
943 // The TSO might have moved, so find the new location:
944 t = cap->r.rCurrentTSO;
946 // And save the current errno in this thread.
947 t->saved_errno = errno;
949 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
951 /* Costs for the scheduler are assigned to CCS_SYSTEM */
957 ACQUIRE_LOCK(&sched_mutex);
959 #ifdef RTS_SUPPORTS_THREADS
960 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
961 #elif !defined(GRAN) && !defined(PAR)
962 IF_DEBUG(scheduler,debugBelch("sched: "););
966 /* HACK 675: if the last thread didn't yield, make sure to print a
967 SCHEDULE event to the log file when StgRunning the next thread, even
968 if it is the same one as before */
970 TimeOfLastYield = CURRENT_TIME;
976 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
977 globalGranStats.tot_heapover++;
979 globalParStats.tot_heapover++;
982 // did the task ask for a large block?
983 if (cap->r.rHpAlloc > BLOCK_SIZE) {
984 // if so, get one and push it on the front of the nursery.
988 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
990 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n",
991 (long)t->id, whatNext_strs[t->what_next], blocks));
993 // don't do this if it would push us over the
994 // alloc_blocks_lim limit; we'll GC first.
995 if (alloc_blocks + blocks < alloc_blocks_lim) {
997 alloc_blocks += blocks;
998 bd = allocGroup( blocks );
1000 // link the new group into the list
1001 bd->link = cap->r.rCurrentNursery;
1002 bd->u.back = cap->r.rCurrentNursery->u.back;
1003 if (cap->r.rCurrentNursery->u.back != NULL) {
1004 cap->r.rCurrentNursery->u.back->link = bd;
1006 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1007 g0s0->blocks == cap->r.rNursery);
1008 cap->r.rNursery = g0s0->blocks = bd;
1010 cap->r.rCurrentNursery->u.back = bd;
1012 // initialise it as a nursery block. We initialise the
1013 // step, gen_no, and flags field of *every* sub-block in
1014 // this large block, because this is easier than making
1015 // sure that we always find the block head of a large
1016 // block whenever we call Bdescr() (eg. evacuate() and
1017 // isAlive() in the GC would both have to do this, at
1021 for (x = bd; x < bd + blocks; x++) {
1028 // don't forget to update the block count in g0s0.
1029 g0s0->n_blocks += blocks;
1030 // This assert can be a killer if the app is doing lots
1031 // of large block allocations.
1032 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1034 // now update the nursery to point to the new block
1035 cap->r.rCurrentNursery = bd;
1037 // we might be unlucky and have another thread get on the
1038 // run queue before us and steal the large block, but in that
1039 // case the thread will just end up requesting another large
1041 PUSH_ON_RUN_QUEUE(t);
1046 /* make all the running tasks block on a condition variable,
1047 * maybe set context_switch and wait till they all pile in,
1048 * then have them wait on a GC condition variable.
1050 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1051 (long)t->id, whatNext_strs[t->what_next]));
1054 ASSERT(!is_on_queue(t,CurrentProc));
1056 /* Currently we emit a DESCHEDULE event before GC in GUM.
1057 ToDo: either add separate event to distinguish SYSTEM time from rest
1058 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1059 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1060 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1061 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1062 emitSchedule = rtsTrue;
1066 ready_to_gc = rtsTrue;
1067 context_switch = 1; /* stop other threads ASAP */
1068 PUSH_ON_RUN_QUEUE(t);
1069 /* actual GC is done at the end of the while loop */
1075 DumpGranEvent(GR_DESCHEDULE, t));
1076 globalGranStats.tot_stackover++;
1079 // DumpGranEvent(GR_DESCHEDULE, t);
1080 globalParStats.tot_stackover++;
1082 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1083 (long)t->id, whatNext_strs[t->what_next]));
1084 /* just adjust the stack for this thread, then pop it back
1089 /* enlarge the stack */
1090 StgTSO *new_t = threadStackOverflow(t);
1092 /* This TSO has moved, so update any pointers to it from the
1093 * main thread stack. It better not be on any other queues...
1094 * (it shouldn't be).
1096 if (t->main != NULL) {
1097 t->main->tso = new_t;
1099 PUSH_ON_RUN_QUEUE(new_t);
1103 case ThreadYielding:
1104 // Reset the context switch flag. We don't do this just before
1105 // running the thread, because that would mean we would lose ticks
1106 // during GC, which can lead to unfair scheduling (a thread hogs
1107 // the CPU because the tick always arrives during GC). This way
1108 // penalises threads that do a lot of allocation, but that seems
1109 // better than the alternative.
1114 DumpGranEvent(GR_DESCHEDULE, t));
1115 globalGranStats.tot_yields++;
1118 // DumpGranEvent(GR_DESCHEDULE, t);
1119 globalParStats.tot_yields++;
1121 /* put the thread back on the run queue. Then, if we're ready to
1122 * GC, check whether this is the last task to stop. If so, wake
1123 * up the GC thread. getThread will block during a GC until the
1127 if (t->what_next != prev_what_next) {
1128 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1129 (long)t->id, whatNext_strs[t->what_next]);
1131 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1132 (long)t->id, whatNext_strs[t->what_next]);
1137 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1139 ASSERT(t->link == END_TSO_QUEUE);
1141 // Shortcut if we're just switching evaluators: don't bother
1142 // doing stack squeezing (which can be expensive), just run the
1144 if (t->what_next != prev_what_next) {
1151 ASSERT(!is_on_queue(t,CurrentProc));
1154 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1155 checkThreadQsSanity(rtsTrue));
1159 if (RtsFlags.ParFlags.doFairScheduling) {
1160 /* this does round-robin scheduling; good for concurrency */
1161 APPEND_TO_RUN_QUEUE(t);
1163 /* this does unfair scheduling; good for parallelism */
1164 PUSH_ON_RUN_QUEUE(t);
1167 // this does round-robin scheduling; good for concurrency
1168 APPEND_TO_RUN_QUEUE(t);
1172 /* add a ContinueThread event to actually process the thread */
1173 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1175 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1177 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1186 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1187 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1188 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1190 // ??? needed; should emit block before
1192 DumpGranEvent(GR_DESCHEDULE, t));
1193 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1196 ASSERT(procStatus[CurrentProc]==Busy ||
1197 ((procStatus[CurrentProc]==Fetching) &&
1198 (t->block_info.closure!=(StgClosure*)NULL)));
1199 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1200 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1201 procStatus[CurrentProc]==Fetching))
1202 procStatus[CurrentProc] = Idle;
1206 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1207 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1210 if (t->block_info.closure!=(StgClosure*)NULL)
1211 print_bq(t->block_info.closure));
1213 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1216 /* whatever we schedule next, we must log that schedule */
1217 emitSchedule = rtsTrue;
1220 /* don't need to do anything. Either the thread is blocked on
1221 * I/O, in which case we'll have called addToBlockedQueue
1222 * previously, or it's blocked on an MVar or Blackhole, in which
1223 * case it'll be on the relevant queue already.
1225 ASSERT(t->why_blocked != NotBlocked);
1227 debugBelch("--<< thread %d (%s) stopped: ",
1228 t->id, whatNext_strs[t->what_next]);
1229 printThreadBlockage(t);
1232 /* Only for dumping event to log file
1233 ToDo: do I need this in GranSim, too?
1240 case ThreadFinished:
1241 /* Need to check whether this was a main thread, and if so, signal
1242 * the task that started it with the return value. If we have no
1243 * more main threads, we probably need to stop all the tasks until
1246 /* We also end up here if the thread kills itself with an
1247 * uncaught exception, see Exception.hc.
1249 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1250 t->id, whatNext_strs[t->what_next]));
1252 endThread(t, CurrentProc); // clean-up the thread
1254 /* For now all are advisory -- HWL */
1255 //if(t->priority==AdvisoryPriority) ??
1256 advisory_thread_count--;
1259 if(t->dist.priority==RevalPriority)
1263 if (RtsFlags.ParFlags.ParStats.Full &&
1264 !RtsFlags.ParFlags.ParStats.Suppressed)
1265 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1269 // Check whether the thread that just completed was a main
1270 // thread, and if so return with the result.
1272 // There is an assumption here that all thread completion goes
1273 // through this point; we need to make sure that if a thread
1274 // ends up in the ThreadKilled state, that it stays on the run
1275 // queue so it can be dealt with here.
1278 #if defined(RTS_SUPPORTS_THREADS)
1281 mainThread->tso == t
1285 // We are a bound thread: this must be our thread that just
1287 ASSERT(mainThread->tso == t);
1289 if (t->what_next == ThreadComplete) {
1290 if (mainThread->ret) {
1291 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1292 *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1];
1294 mainThread->stat = Success;
1296 if (mainThread->ret) {
1297 *(mainThread->ret) = NULL;
1299 if (was_interrupted) {
1300 mainThread->stat = Interrupted;
1302 mainThread->stat = Killed;
1306 removeThreadLabel((StgWord)mainThread->tso->id);
1308 if (mainThread->prev == NULL) {
1309 main_threads = mainThread->link;
1311 mainThread->prev->link = mainThread->link;
1313 if (mainThread->link != NULL) {
1314 mainThread->link->prev = NULL;
1316 releaseCapability(cap);
1320 #ifdef RTS_SUPPORTS_THREADS
1321 ASSERT(t->main == NULL);
1323 if (t->main != NULL) {
1324 // Must be a main thread that is not the topmost one. Leave
1325 // it on the run queue until the stack has unwound to the
1326 // point where we can deal with this. Leaving it on the run
1327 // queue also ensures that the garbage collector knows about
1328 // this thread and its return value (it gets dropped from the
1329 // all_threads list so there's no other way to find it).
1330 APPEND_TO_RUN_QUEUE(t);
1336 barf("schedule: invalid thread return code %d", (int)ret);
1340 // When we have +RTS -i0 and we're heap profiling, do a census at
1341 // every GC. This lets us get repeatable runs for debugging.
1342 if (performHeapProfile ||
1343 (RtsFlags.ProfFlags.profileInterval==0 &&
1344 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1345 GarbageCollect(GetRoots, rtsTrue);
1347 performHeapProfile = rtsFalse;
1348 ready_to_gc = rtsFalse; // we already GC'd
1353 /* Kick any transactions which are invalid back to their atomically frames.
1354 * When next scheduled they will try to commit, this commit will fail and
1355 * they will retry. */
1356 for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1357 if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1358 if (!stmValidateTransaction (t -> trec)) {
1359 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1361 // strip the stack back to the ATOMICALLY_FRAME, aborting
1362 // the (nested) transaction, and saving the stack of any
1363 // partially-evaluated thunks on the heap.
1364 raiseAsync_(t, NULL, rtsTrue);
1367 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1373 /* everybody back, start the GC.
1374 * Could do it in this thread, or signal a condition var
1375 * to do it in another thread. Either way, we need to
1376 * broadcast on gc_pending_cond afterward.
1378 #if defined(RTS_SUPPORTS_THREADS)
1379 IF_DEBUG(scheduler,sched_belch("doing GC"));
1381 GarbageCollect(GetRoots,rtsFalse);
1382 ready_to_gc = rtsFalse;
1384 /* add a ContinueThread event to continue execution of current thread */
1385 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1387 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1389 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1397 IF_GRAN_DEBUG(unused,
1398 print_eventq(EventHd));
1400 event = get_next_event();
1403 /* ToDo: wait for next message to arrive rather than busy wait */
1406 } /* end of while(1) */
1408 IF_PAR_DEBUG(verbose,
1409 debugBelch("== Leaving schedule() after having received Finish\n"));
1412 /* ---------------------------------------------------------------------------
1413 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1414 * used by Control.Concurrent for error checking.
1415 * ------------------------------------------------------------------------- */
1418 rtsSupportsBoundThreads(void)
1427 /* ---------------------------------------------------------------------------
1428 * isThreadBound(tso): check whether tso is bound to an OS thread.
1429 * ------------------------------------------------------------------------- */
1432 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1435 return (tso->main != NULL);
1440 /* ---------------------------------------------------------------------------
1441 * Singleton fork(). Do not copy any running threads.
1442 * ------------------------------------------------------------------------- */
1444 #ifndef mingw32_HOST_OS
1445 #define FORKPROCESS_PRIMOP_SUPPORTED
1448 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1450 deleteThreadImmediately(StgTSO *tso);
1453 forkProcess(HsStablePtr *entry
1454 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1459 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1465 IF_DEBUG(scheduler,sched_belch("forking!"));
1466 rts_lock(); // This not only acquires sched_mutex, it also
1467 // makes sure that no other threads are running
1471 if (pid) { /* parent */
1473 /* just return the pid */
1477 } else { /* child */
1480 // delete all threads
1481 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1483 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1486 // don't allow threads to catch the ThreadKilled exception
1487 deleteThreadImmediately(t);
1490 // wipe the main thread list
1491 while((m = main_threads) != NULL) {
1492 main_threads = m->link;
1493 # ifdef THREADED_RTS
1494 closeCondition(&m->bound_thread_cond);
1499 rc = rts_evalStableIO(entry, NULL); // run the action
1500 rts_checkSchedStatus("forkProcess",rc);
1504 hs_exit(); // clean up and exit
1507 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1508 barf("forkProcess#: primop not supported, sorry!\n");
1513 /* ---------------------------------------------------------------------------
1514 * deleteAllThreads(): kill all the live threads.
1516 * This is used when we catch a user interrupt (^C), before performing
1517 * any necessary cleanups and running finalizers.
1519 * Locks: sched_mutex held.
1520 * ------------------------------------------------------------------------- */
1523 deleteAllThreads ( void )
1526 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1527 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1528 next = t->global_link;
1532 // The run queue now contains a bunch of ThreadKilled threads. We
1533 // must not throw these away: the main thread(s) will be in there
1534 // somewhere, and the main scheduler loop has to deal with it.
1535 // Also, the run queue is the only thing keeping these threads from
1536 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1538 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1539 ASSERT(sleeping_queue == END_TSO_QUEUE);
1542 /* startThread and insertThread are now in GranSim.c -- HWL */
1545 /* ---------------------------------------------------------------------------
1546 * Suspending & resuming Haskell threads.
1548 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1549 * its capability before calling the C function. This allows another
1550 * task to pick up the capability and carry on running Haskell
1551 * threads. It also means that if the C call blocks, it won't lock
1554 * The Haskell thread making the C call is put to sleep for the
1555 * duration of the call, on the susepended_ccalling_threads queue. We
1556 * give out a token to the task, which it can use to resume the thread
1557 * on return from the C function.
1558 * ------------------------------------------------------------------------- */
1561 suspendThread( StgRegTable *reg )
1565 int saved_errno = errno;
1567 /* assume that *reg is a pointer to the StgRegTable part
1570 cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1572 ACQUIRE_LOCK(&sched_mutex);
1575 sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1577 // XXX this might not be necessary --SDM
1578 cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1580 threadPaused(cap->r.rCurrentTSO);
1581 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1582 suspended_ccalling_threads = cap->r.rCurrentTSO;
1584 if(cap->r.rCurrentTSO->blocked_exceptions == NULL) {
1585 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1586 cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1588 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1591 /* Use the thread ID as the token; it should be unique */
1592 tok = cap->r.rCurrentTSO->id;
1594 /* Hand back capability */
1595 releaseCapability(cap);
1597 #if defined(RTS_SUPPORTS_THREADS)
1598 /* Preparing to leave the RTS, so ensure there's a native thread/task
1599 waiting to take over.
1601 IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1604 RELEASE_LOCK(&sched_mutex);
1606 errno = saved_errno;
1607 in_haskell = rtsFalse;
1612 resumeThread( StgInt tok )
1614 StgTSO *tso, **prev;
1616 int saved_errno = errno;
1618 #if defined(RTS_SUPPORTS_THREADS)
1619 /* Wait for permission to re-enter the RTS with the result. */
1620 ACQUIRE_LOCK(&sched_mutex);
1621 waitForReturnCapability(&sched_mutex, &cap);
1623 IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1625 grabCapability(&cap);
1628 /* Remove the thread off of the suspended list */
1629 prev = &suspended_ccalling_threads;
1630 for (tso = suspended_ccalling_threads;
1631 tso != END_TSO_QUEUE;
1632 prev = &tso->link, tso = tso->link) {
1633 if (tso->id == (StgThreadID)tok) {
1638 if (tso == END_TSO_QUEUE) {
1639 barf("resumeThread: thread not found");
1641 tso->link = END_TSO_QUEUE;
1643 if(tso->why_blocked == BlockedOnCCall) {
1644 awakenBlockedQueueNoLock(tso->blocked_exceptions);
1645 tso->blocked_exceptions = NULL;
1648 /* Reset blocking status */
1649 tso->why_blocked = NotBlocked;
1651 cap->r.rCurrentTSO = tso;
1652 RELEASE_LOCK(&sched_mutex);
1653 errno = saved_errno;
1654 in_haskell = rtsTrue;
1659 /* ---------------------------------------------------------------------------
1661 * ------------------------------------------------------------------------ */
1662 static void unblockThread(StgTSO *tso);
1664 /* ---------------------------------------------------------------------------
1665 * Comparing Thread ids.
1667 * This is used from STG land in the implementation of the
1668 * instances of Eq/Ord for ThreadIds.
1669 * ------------------------------------------------------------------------ */
1672 cmp_thread(StgPtr tso1, StgPtr tso2)
1674 StgThreadID id1 = ((StgTSO *)tso1)->id;
1675 StgThreadID id2 = ((StgTSO *)tso2)->id;
1677 if (id1 < id2) return (-1);
1678 if (id1 > id2) return 1;
1682 /* ---------------------------------------------------------------------------
1683 * Fetching the ThreadID from an StgTSO.
1685 * This is used in the implementation of Show for ThreadIds.
1686 * ------------------------------------------------------------------------ */
1688 rts_getThreadId(StgPtr tso)
1690 return ((StgTSO *)tso)->id;
1695 labelThread(StgPtr tso, char *label)
1700 /* Caveat: Once set, you can only set the thread name to "" */
1701 len = strlen(label)+1;
1702 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1703 strncpy(buf,label,len);
1704 /* Update will free the old memory for us */
1705 updateThreadLabel(((StgTSO *)tso)->id,buf);
1709 /* ---------------------------------------------------------------------------
1710 Create a new thread.
1712 The new thread starts with the given stack size. Before the
1713 scheduler can run, however, this thread needs to have a closure
1714 (and possibly some arguments) pushed on its stack. See
1715 pushClosure() in Schedule.h.
1717 createGenThread() and createIOThread() (in SchedAPI.h) are
1718 convenient packaged versions of this function.
1720 currently pri (priority) is only used in a GRAN setup -- HWL
1721 ------------------------------------------------------------------------ */
1723 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1725 createThread(nat size, StgInt pri)
1728 createThread(nat size)
1735 /* First check whether we should create a thread at all */
1737 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1738 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1740 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1741 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1742 return END_TSO_QUEUE;
1748 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1751 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1753 /* catch ridiculously small stack sizes */
1754 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1755 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1758 stack_size = size - TSO_STRUCT_SIZEW;
1760 tso = (StgTSO *)allocate(size);
1761 TICK_ALLOC_TSO(stack_size, 0);
1763 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1765 SET_GRAN_HDR(tso, ThisPE);
1768 // Always start with the compiled code evaluator
1769 tso->what_next = ThreadRunGHC;
1771 tso->id = next_thread_id++;
1772 tso->why_blocked = NotBlocked;
1773 tso->blocked_exceptions = NULL;
1775 tso->saved_errno = 0;
1778 tso->stack_size = stack_size;
1779 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1781 tso->sp = (P_)&(tso->stack) + stack_size;
1783 tso->trec = NO_TREC;
1786 tso->prof.CCCS = CCS_MAIN;
1789 /* put a stop frame on the stack */
1790 tso->sp -= sizeofW(StgStopFrame);
1791 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1792 tso->link = END_TSO_QUEUE;
1796 /* uses more flexible routine in GranSim */
1797 insertThread(tso, CurrentProc);
1799 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1805 if (RtsFlags.GranFlags.GranSimStats.Full)
1806 DumpGranEvent(GR_START,tso);
1808 if (RtsFlags.ParFlags.ParStats.Full)
1809 DumpGranEvent(GR_STARTQ,tso);
1810 /* HACk to avoid SCHEDULE
1814 /* Link the new thread on the global thread list.
1816 tso->global_link = all_threads;
1820 tso->dist.priority = MandatoryPriority; //by default that is...
1824 tso->gran.pri = pri;
1826 tso->gran.magic = TSO_MAGIC; // debugging only
1828 tso->gran.sparkname = 0;
1829 tso->gran.startedat = CURRENT_TIME;
1830 tso->gran.exported = 0;
1831 tso->gran.basicblocks = 0;
1832 tso->gran.allocs = 0;
1833 tso->gran.exectime = 0;
1834 tso->gran.fetchtime = 0;
1835 tso->gran.fetchcount = 0;
1836 tso->gran.blocktime = 0;
1837 tso->gran.blockcount = 0;
1838 tso->gran.blockedat = 0;
1839 tso->gran.globalsparks = 0;
1840 tso->gran.localsparks = 0;
1841 if (RtsFlags.GranFlags.Light)
1842 tso->gran.clock = Now; /* local clock */
1844 tso->gran.clock = 0;
1846 IF_DEBUG(gran,printTSO(tso));
1849 tso->par.magic = TSO_MAGIC; // debugging only
1851 tso->par.sparkname = 0;
1852 tso->par.startedat = CURRENT_TIME;
1853 tso->par.exported = 0;
1854 tso->par.basicblocks = 0;
1855 tso->par.allocs = 0;
1856 tso->par.exectime = 0;
1857 tso->par.fetchtime = 0;
1858 tso->par.fetchcount = 0;
1859 tso->par.blocktime = 0;
1860 tso->par.blockcount = 0;
1861 tso->par.blockedat = 0;
1862 tso->par.globalsparks = 0;
1863 tso->par.localsparks = 0;
1867 globalGranStats.tot_threads_created++;
1868 globalGranStats.threads_created_on_PE[CurrentProc]++;
1869 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1870 globalGranStats.tot_sq_probes++;
1872 // collect parallel global statistics (currently done together with GC stats)
1873 if (RtsFlags.ParFlags.ParStats.Global &&
1874 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1875 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
1876 globalParStats.tot_threads_created++;
1882 sched_belch("==__ schedule: Created TSO %d (%p);",
1883 CurrentProc, tso, tso->id));
1885 IF_PAR_DEBUG(verbose,
1886 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1887 (long)tso->id, tso, advisory_thread_count));
1889 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1890 (long)tso->id, (long)tso->stack_size));
1897 all parallel thread creation calls should fall through the following routine.
1900 createSparkThread(rtsSpark spark)
1902 ASSERT(spark != (rtsSpark)NULL);
1903 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1905 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1906 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1907 return END_TSO_QUEUE;
1911 tso = createThread(RtsFlags.GcFlags.initialStkSize);
1912 if (tso==END_TSO_QUEUE)
1913 barf("createSparkThread: Cannot create TSO");
1915 tso->priority = AdvisoryPriority;
1917 pushClosure(tso,spark);
1918 PUSH_ON_RUN_QUEUE(tso);
1919 advisory_thread_count++;
1926 Turn a spark into a thread.
1927 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1931 activateSpark (rtsSpark spark)
1935 tso = createSparkThread(spark);
1936 if (RtsFlags.ParFlags.ParStats.Full) {
1937 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1938 IF_PAR_DEBUG(verbose,
1939 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1940 (StgClosure *)spark, info_type((StgClosure *)spark)));
1942 // ToDo: fwd info on local/global spark to thread -- HWL
1943 // tso->gran.exported = spark->exported;
1944 // tso->gran.locked = !spark->global;
1945 // tso->gran.sparkname = spark->name;
1951 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1952 Capability *initialCapability
1956 /* ---------------------------------------------------------------------------
1959 * scheduleThread puts a thread on the head of the runnable queue.
1960 * This will usually be done immediately after a thread is created.
1961 * The caller of scheduleThread must create the thread using e.g.
1962 * createThread and push an appropriate closure
1963 * on this thread's stack before the scheduler is invoked.
1964 * ------------------------------------------------------------------------ */
1966 static void scheduleThread_ (StgTSO* tso);
1969 scheduleThread_(StgTSO *tso)
1971 // The thread goes at the *end* of the run-queue, to avoid possible
1972 // starvation of any threads already on the queue.
1973 APPEND_TO_RUN_QUEUE(tso);
1978 scheduleThread(StgTSO* tso)
1980 ACQUIRE_LOCK(&sched_mutex);
1981 scheduleThread_(tso);
1982 RELEASE_LOCK(&sched_mutex);
1985 #if defined(RTS_SUPPORTS_THREADS)
1986 static Condition bound_cond_cache;
1987 static int bound_cond_cache_full = 0;
1992 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1993 Capability *initialCapability)
1995 // Precondition: sched_mutex must be held
1998 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2003 m->link = main_threads;
2005 if (main_threads != NULL) {
2006 main_threads->prev = m;
2010 #if defined(RTS_SUPPORTS_THREADS)
2011 // Allocating a new condition for each thread is expensive, so we
2012 // cache one. This is a pretty feeble hack, but it helps speed up
2013 // consecutive call-ins quite a bit.
2014 if (bound_cond_cache_full) {
2015 m->bound_thread_cond = bound_cond_cache;
2016 bound_cond_cache_full = 0;
2018 initCondition(&m->bound_thread_cond);
2022 /* Put the thread on the main-threads list prior to scheduling the TSO.
2023 Failure to do so introduces a race condition in the MT case (as
2024 identified by Wolfgang Thaller), whereby the new task/OS thread
2025 created by scheduleThread_() would complete prior to the thread
2026 that spawned it managed to put 'itself' on the main-threads list.
2027 The upshot of it all being that the worker thread wouldn't get to
2028 signal the completion of the its work item for the main thread to
2029 see (==> it got stuck waiting.) -- sof 6/02.
2031 IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2033 APPEND_TO_RUN_QUEUE(tso);
2034 // NB. Don't call threadRunnable() here, because the thread is
2035 // bound and only runnable by *this* OS thread, so waking up other
2036 // workers will just slow things down.
2038 return waitThread_(m, initialCapability);
2041 /* ---------------------------------------------------------------------------
2044 * Initialise the scheduler. This resets all the queues - if the
2045 * queues contained any threads, they'll be garbage collected at the
2048 * ------------------------------------------------------------------------ */
2056 for (i=0; i<=MAX_PROC; i++) {
2057 run_queue_hds[i] = END_TSO_QUEUE;
2058 run_queue_tls[i] = END_TSO_QUEUE;
2059 blocked_queue_hds[i] = END_TSO_QUEUE;
2060 blocked_queue_tls[i] = END_TSO_QUEUE;
2061 ccalling_threadss[i] = END_TSO_QUEUE;
2062 sleeping_queue = END_TSO_QUEUE;
2065 run_queue_hd = END_TSO_QUEUE;
2066 run_queue_tl = END_TSO_QUEUE;
2067 blocked_queue_hd = END_TSO_QUEUE;
2068 blocked_queue_tl = END_TSO_QUEUE;
2069 sleeping_queue = END_TSO_QUEUE;
2072 suspended_ccalling_threads = END_TSO_QUEUE;
2074 main_threads = NULL;
2075 all_threads = END_TSO_QUEUE;
2080 RtsFlags.ConcFlags.ctxtSwitchTicks =
2081 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2083 #if defined(RTS_SUPPORTS_THREADS)
2084 /* Initialise the mutex and condition variables used by
2086 initMutex(&sched_mutex);
2087 initMutex(&term_mutex);
2090 ACQUIRE_LOCK(&sched_mutex);
2092 /* A capability holds the state a native thread needs in
2093 * order to execute STG code. At least one capability is
2094 * floating around (only SMP builds have more than one).
2098 #if defined(RTS_SUPPORTS_THREADS)
2099 /* start our haskell execution tasks */
2100 startTaskManager(0,taskStart);
2103 #if /* defined(SMP) ||*/ defined(PAR)
2107 RELEASE_LOCK(&sched_mutex);
2111 exitScheduler( void )
2113 #if defined(RTS_SUPPORTS_THREADS)
2116 shutting_down_scheduler = rtsTrue;
2119 /* ----------------------------------------------------------------------------
2120 Managing the per-task allocation areas.
2122 Each capability comes with an allocation area. These are
2123 fixed-length block lists into which allocation can be done.
2125 ToDo: no support for two-space collection at the moment???
2126 ------------------------------------------------------------------------- */
2130 waitThread_(StgMainThread* m, Capability *initialCapability)
2132 SchedulerStatus stat;
2134 // Precondition: sched_mutex must be held.
2135 IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2138 /* GranSim specific init */
2139 CurrentTSO = m->tso; // the TSO to run
2140 procStatus[MainProc] = Busy; // status of main PE
2141 CurrentProc = MainProc; // PE to run it on
2142 schedule(m,initialCapability);
2144 schedule(m,initialCapability);
2145 ASSERT(m->stat != NoStatus);
2150 #if defined(RTS_SUPPORTS_THREADS)
2151 // Free the condition variable, returning it to the cache if possible.
2152 if (!bound_cond_cache_full) {
2153 bound_cond_cache = m->bound_thread_cond;
2154 bound_cond_cache_full = 1;
2156 closeCondition(&m->bound_thread_cond);
2160 IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2163 // Postcondition: sched_mutex still held
2167 /* ---------------------------------------------------------------------------
2168 Where are the roots that we know about?
2170 - all the threads on the runnable queue
2171 - all the threads on the blocked queue
2172 - all the threads on the sleeping queue
2173 - all the thread currently executing a _ccall_GC
2174 - all the "main threads"
2176 ------------------------------------------------------------------------ */
2178 /* This has to be protected either by the scheduler monitor, or by the
2179 garbage collection monitor (probably the latter).
2184 GetRoots( evac_fn evac )
2189 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2190 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2191 evac((StgClosure **)&run_queue_hds[i]);
2192 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2193 evac((StgClosure **)&run_queue_tls[i]);
2195 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2196 evac((StgClosure **)&blocked_queue_hds[i]);
2197 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2198 evac((StgClosure **)&blocked_queue_tls[i]);
2199 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2200 evac((StgClosure **)&ccalling_threads[i]);
2207 if (run_queue_hd != END_TSO_QUEUE) {
2208 ASSERT(run_queue_tl != END_TSO_QUEUE);
2209 evac((StgClosure **)&run_queue_hd);
2210 evac((StgClosure **)&run_queue_tl);
2213 if (blocked_queue_hd != END_TSO_QUEUE) {
2214 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2215 evac((StgClosure **)&blocked_queue_hd);
2216 evac((StgClosure **)&blocked_queue_tl);
2219 if (sleeping_queue != END_TSO_QUEUE) {
2220 evac((StgClosure **)&sleeping_queue);
2224 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2225 evac((StgClosure **)&suspended_ccalling_threads);
2228 #if defined(PAR) || defined(GRAN)
2229 markSparkQueue(evac);
2232 #if defined(RTS_USER_SIGNALS)
2233 // mark the signal handlers (signals should be already blocked)
2234 markSignalHandlers(evac);
2238 /* -----------------------------------------------------------------------------
2241 This is the interface to the garbage collector from Haskell land.
2242 We provide this so that external C code can allocate and garbage
2243 collect when called from Haskell via _ccall_GC.
2245 It might be useful to provide an interface whereby the programmer
2246 can specify more roots (ToDo).
2248 This needs to be protected by the GC condition variable above. KH.
2249 -------------------------------------------------------------------------- */
2251 static void (*extra_roots)(evac_fn);
2256 /* Obligated to hold this lock upon entry */
2257 ACQUIRE_LOCK(&sched_mutex);
2258 GarbageCollect(GetRoots,rtsFalse);
2259 RELEASE_LOCK(&sched_mutex);
2263 performMajorGC(void)
2265 ACQUIRE_LOCK(&sched_mutex);
2266 GarbageCollect(GetRoots,rtsTrue);
2267 RELEASE_LOCK(&sched_mutex);
2271 AllRoots(evac_fn evac)
2273 GetRoots(evac); // the scheduler's roots
2274 extra_roots(evac); // the user's roots
2278 performGCWithRoots(void (*get_roots)(evac_fn))
2280 ACQUIRE_LOCK(&sched_mutex);
2281 extra_roots = get_roots;
2282 GarbageCollect(AllRoots,rtsFalse);
2283 RELEASE_LOCK(&sched_mutex);
2286 /* -----------------------------------------------------------------------------
2289 If the thread has reached its maximum stack size, then raise the
2290 StackOverflow exception in the offending thread. Otherwise
2291 relocate the TSO into a larger chunk of memory and adjust its stack
2293 -------------------------------------------------------------------------- */
2296 threadStackOverflow(StgTSO *tso)
2298 nat new_stack_size, new_tso_size, stack_words;
2302 IF_DEBUG(sanity,checkTSO(tso));
2303 if (tso->stack_size >= tso->max_stack_size) {
2306 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2307 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2308 /* If we're debugging, just print out the top of the stack */
2309 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2312 /* Send this thread the StackOverflow exception */
2313 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2317 /* Try to double the current stack size. If that takes us over the
2318 * maximum stack size for this thread, then use the maximum instead.
2319 * Finally round up so the TSO ends up as a whole number of blocks.
2321 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2322 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2323 TSO_STRUCT_SIZE)/sizeof(W_);
2324 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2325 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2327 IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2329 dest = (StgTSO *)allocate(new_tso_size);
2330 TICK_ALLOC_TSO(new_stack_size,0);
2332 /* copy the TSO block and the old stack into the new area */
2333 memcpy(dest,tso,TSO_STRUCT_SIZE);
2334 stack_words = tso->stack + tso->stack_size - tso->sp;
2335 new_sp = (P_)dest + new_tso_size - stack_words;
2336 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2338 /* relocate the stack pointers... */
2340 dest->stack_size = new_stack_size;
2342 /* Mark the old TSO as relocated. We have to check for relocated
2343 * TSOs in the garbage collector and any primops that deal with TSOs.
2345 * It's important to set the sp value to just beyond the end
2346 * of the stack, so we don't attempt to scavenge any part of the
2349 tso->what_next = ThreadRelocated;
2351 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2352 tso->why_blocked = NotBlocked;
2353 dest->mut_link = NULL;
2355 IF_PAR_DEBUG(verbose,
2356 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2357 tso->id, tso, tso->stack_size);
2358 /* If we're debugging, just print out the top of the stack */
2359 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2362 IF_DEBUG(sanity,checkTSO(tso));
2364 IF_DEBUG(scheduler,printTSO(dest));
2370 /* ---------------------------------------------------------------------------
2371 Wake up a queue that was blocked on some resource.
2372 ------------------------------------------------------------------------ */
2376 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2381 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2383 /* write RESUME events to log file and
2384 update blocked and fetch time (depending on type of the orig closure) */
2385 if (RtsFlags.ParFlags.ParStats.Full) {
2386 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2387 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2388 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2389 if (EMPTY_RUN_QUEUE())
2390 emitSchedule = rtsTrue;
2392 switch (get_itbl(node)->type) {
2394 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2399 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2406 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2413 static StgBlockingQueueElement *
2414 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2417 PEs node_loc, tso_loc;
2419 node_loc = where_is(node); // should be lifted out of loop
2420 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2421 tso_loc = where_is((StgClosure *)tso);
2422 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2423 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2424 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2425 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2426 // insertThread(tso, node_loc);
2427 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2429 tso, node, (rtsSpark*)NULL);
2430 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2433 } else { // TSO is remote (actually should be FMBQ)
2434 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2435 RtsFlags.GranFlags.Costs.gunblocktime +
2436 RtsFlags.GranFlags.Costs.latency;
2437 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2439 tso, node, (rtsSpark*)NULL);
2440 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2443 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2445 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2446 (node_loc==tso_loc ? "Local" : "Global"),
2447 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2448 tso->block_info.closure = NULL;
2449 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
2453 static StgBlockingQueueElement *
2454 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2456 StgBlockingQueueElement *next;
2458 switch (get_itbl(bqe)->type) {
2460 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2461 /* if it's a TSO just push it onto the run_queue */
2463 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2464 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
2466 unblockCount(bqe, node);
2467 /* reset blocking status after dumping event */
2468 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2472 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2474 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2475 PendingFetches = (StgBlockedFetch *)bqe;
2479 /* can ignore this case in a non-debugging setup;
2480 see comments on RBHSave closures above */
2482 /* check that the closure is an RBHSave closure */
2483 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2484 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2485 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2489 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2490 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2494 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2498 #else /* !GRAN && !PAR */
2500 unblockOneLocked(StgTSO *tso)
2504 ASSERT(get_itbl(tso)->type == TSO);
2505 ASSERT(tso->why_blocked != NotBlocked);
2506 tso->why_blocked = NotBlocked;
2508 tso->link = END_TSO_QUEUE;
2509 APPEND_TO_RUN_QUEUE(tso);
2511 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2516 #if defined(GRAN) || defined(PAR)
2517 INLINE_ME StgBlockingQueueElement *
2518 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2520 ACQUIRE_LOCK(&sched_mutex);
2521 bqe = unblockOneLocked(bqe, node);
2522 RELEASE_LOCK(&sched_mutex);
2527 unblockOne(StgTSO *tso)
2529 ACQUIRE_LOCK(&sched_mutex);
2530 tso = unblockOneLocked(tso);
2531 RELEASE_LOCK(&sched_mutex);
2538 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2540 StgBlockingQueueElement *bqe;
2545 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2546 node, CurrentProc, CurrentTime[CurrentProc],
2547 CurrentTSO->id, CurrentTSO));
2549 node_loc = where_is(node);
2551 ASSERT(q == END_BQ_QUEUE ||
2552 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2553 get_itbl(q)->type == CONSTR); // closure (type constructor)
2554 ASSERT(is_unique(node));
2556 /* FAKE FETCH: magically copy the node to the tso's proc;
2557 no Fetch necessary because in reality the node should not have been
2558 moved to the other PE in the first place
2560 if (CurrentProc!=node_loc) {
2562 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2563 node, node_loc, CurrentProc, CurrentTSO->id,
2564 // CurrentTSO, where_is(CurrentTSO),
2565 node->header.gran.procs));
2566 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2568 debugBelch("## new bitmask of node %p is %#x\n",
2569 node, node->header.gran.procs));
2570 if (RtsFlags.GranFlags.GranSimStats.Global) {
2571 globalGranStats.tot_fake_fetches++;
2576 // ToDo: check: ASSERT(CurrentProc==node_loc);
2577 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2580 bqe points to the current element in the queue
2581 next points to the next element in the queue
2583 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2584 //tso_loc = where_is(tso);
2586 bqe = unblockOneLocked(bqe, node);
2589 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2590 the closure to make room for the anchor of the BQ */
2591 if (bqe!=END_BQ_QUEUE) {
2592 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2594 ASSERT((info_ptr==&RBH_Save_0_info) ||
2595 (info_ptr==&RBH_Save_1_info) ||
2596 (info_ptr==&RBH_Save_2_info));
2598 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2599 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2600 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2603 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2604 node, info_type(node)));
2607 /* statistics gathering */
2608 if (RtsFlags.GranFlags.GranSimStats.Global) {
2609 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2610 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2611 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2612 globalGranStats.tot_awbq++; // total no. of bqs awakened
2615 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2616 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2620 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2622 StgBlockingQueueElement *bqe;
2624 ACQUIRE_LOCK(&sched_mutex);
2626 IF_PAR_DEBUG(verbose,
2627 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2631 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2632 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2637 ASSERT(q == END_BQ_QUEUE ||
2638 get_itbl(q)->type == TSO ||
2639 get_itbl(q)->type == BLOCKED_FETCH ||
2640 get_itbl(q)->type == CONSTR);
2643 while (get_itbl(bqe)->type==TSO ||
2644 get_itbl(bqe)->type==BLOCKED_FETCH) {
2645 bqe = unblockOneLocked(bqe, node);
2647 RELEASE_LOCK(&sched_mutex);
2650 #else /* !GRAN && !PAR */
2653 awakenBlockedQueueNoLock(StgTSO *tso)
2655 while (tso != END_TSO_QUEUE) {
2656 tso = unblockOneLocked(tso);
2661 awakenBlockedQueue(StgTSO *tso)
2663 ACQUIRE_LOCK(&sched_mutex);
2664 while (tso != END_TSO_QUEUE) {
2665 tso = unblockOneLocked(tso);
2667 RELEASE_LOCK(&sched_mutex);
2671 /* ---------------------------------------------------------------------------
2673 - usually called inside a signal handler so it mustn't do anything fancy.
2674 ------------------------------------------------------------------------ */
2677 interruptStgRts(void)
2683 /* -----------------------------------------------------------------------------
2686 This is for use when we raise an exception in another thread, which
2688 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2689 -------------------------------------------------------------------------- */
2691 #if defined(GRAN) || defined(PAR)
2693 NB: only the type of the blocking queue is different in GranSim and GUM
2694 the operations on the queue-elements are the same
2695 long live polymorphism!
2697 Locks: sched_mutex is held upon entry and exit.
2701 unblockThread(StgTSO *tso)
2703 StgBlockingQueueElement *t, **last;
2705 switch (tso->why_blocked) {
2708 return; /* not blocked */
2711 // Be careful: nothing to do here! We tell the scheduler that the thread
2712 // is runnable and we leave it to the stack-walking code to abort the
2713 // transaction while unwinding the stack. We should perhaps have a debugging
2714 // test to make sure that this really happens and that the 'zombie' transaction
2715 // does not get committed.
2719 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2721 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2722 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2724 last = (StgBlockingQueueElement **)&mvar->head;
2725 for (t = (StgBlockingQueueElement *)mvar->head;
2727 last = &t->link, last_tso = t, t = t->link) {
2728 if (t == (StgBlockingQueueElement *)tso) {
2729 *last = (StgBlockingQueueElement *)tso->link;
2730 if (mvar->tail == tso) {
2731 mvar->tail = (StgTSO *)last_tso;
2736 barf("unblockThread (MVAR): TSO not found");
2739 case BlockedOnBlackHole:
2740 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2742 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2744 last = &bq->blocking_queue;
2745 for (t = bq->blocking_queue;
2747 last = &t->link, t = t->link) {
2748 if (t == (StgBlockingQueueElement *)tso) {
2749 *last = (StgBlockingQueueElement *)tso->link;
2753 barf("unblockThread (BLACKHOLE): TSO not found");
2756 case BlockedOnException:
2758 StgTSO *target = tso->block_info.tso;
2760 ASSERT(get_itbl(target)->type == TSO);
2762 if (target->what_next == ThreadRelocated) {
2763 target = target->link;
2764 ASSERT(get_itbl(target)->type == TSO);
2767 ASSERT(target->blocked_exceptions != NULL);
2769 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2770 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2772 last = &t->link, t = t->link) {
2773 ASSERT(get_itbl(t)->type == TSO);
2774 if (t == (StgBlockingQueueElement *)tso) {
2775 *last = (StgBlockingQueueElement *)tso->link;
2779 barf("unblockThread (Exception): TSO not found");
2783 case BlockedOnWrite:
2784 #if defined(mingw32_HOST_OS)
2785 case BlockedOnDoProc:
2788 /* take TSO off blocked_queue */
2789 StgBlockingQueueElement *prev = NULL;
2790 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2791 prev = t, t = t->link) {
2792 if (t == (StgBlockingQueueElement *)tso) {
2794 blocked_queue_hd = (StgTSO *)t->link;
2795 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2796 blocked_queue_tl = END_TSO_QUEUE;
2799 prev->link = t->link;
2800 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2801 blocked_queue_tl = (StgTSO *)prev;
2807 barf("unblockThread (I/O): TSO not found");
2810 case BlockedOnDelay:
2812 /* take TSO off sleeping_queue */
2813 StgBlockingQueueElement *prev = NULL;
2814 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2815 prev = t, t = t->link) {
2816 if (t == (StgBlockingQueueElement *)tso) {
2818 sleeping_queue = (StgTSO *)t->link;
2820 prev->link = t->link;
2825 barf("unblockThread (delay): TSO not found");
2829 barf("unblockThread");
2833 tso->link = END_TSO_QUEUE;
2834 tso->why_blocked = NotBlocked;
2835 tso->block_info.closure = NULL;
2836 PUSH_ON_RUN_QUEUE(tso);
2840 unblockThread(StgTSO *tso)
2844 /* To avoid locking unnecessarily. */
2845 if (tso->why_blocked == NotBlocked) {
2849 switch (tso->why_blocked) {
2852 // Be careful: nothing to do here! We tell the scheduler that the thread
2853 // is runnable and we leave it to the stack-walking code to abort the
2854 // transaction while unwinding the stack. We should perhaps have a debugging
2855 // test to make sure that this really happens and that the 'zombie' transaction
2856 // does not get committed.
2860 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2862 StgTSO *last_tso = END_TSO_QUEUE;
2863 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2866 for (t = mvar->head; t != END_TSO_QUEUE;
2867 last = &t->link, last_tso = t, t = t->link) {
2870 if (mvar->tail == tso) {
2871 mvar->tail = last_tso;
2876 barf("unblockThread (MVAR): TSO not found");
2879 case BlockedOnBlackHole:
2880 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2882 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2884 last = &bq->blocking_queue;
2885 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2886 last = &t->link, t = t->link) {
2892 barf("unblockThread (BLACKHOLE): TSO not found");
2895 case BlockedOnException:
2897 StgTSO *target = tso->block_info.tso;
2899 ASSERT(get_itbl(target)->type == TSO);
2901 while (target->what_next == ThreadRelocated) {
2902 target = target->link;
2903 ASSERT(get_itbl(target)->type == TSO);
2906 ASSERT(target->blocked_exceptions != NULL);
2908 last = &target->blocked_exceptions;
2909 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2910 last = &t->link, t = t->link) {
2911 ASSERT(get_itbl(t)->type == TSO);
2917 barf("unblockThread (Exception): TSO not found");
2921 case BlockedOnWrite:
2922 #if defined(mingw32_HOST_OS)
2923 case BlockedOnDoProc:
2926 StgTSO *prev = NULL;
2927 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2928 prev = t, t = t->link) {
2931 blocked_queue_hd = t->link;
2932 if (blocked_queue_tl == t) {
2933 blocked_queue_tl = END_TSO_QUEUE;
2936 prev->link = t->link;
2937 if (blocked_queue_tl == t) {
2938 blocked_queue_tl = prev;
2944 barf("unblockThread (I/O): TSO not found");
2947 case BlockedOnDelay:
2949 StgTSO *prev = NULL;
2950 for (t = sleeping_queue; t != END_TSO_QUEUE;
2951 prev = t, t = t->link) {
2954 sleeping_queue = t->link;
2956 prev->link = t->link;
2961 barf("unblockThread (delay): TSO not found");
2965 barf("unblockThread");
2969 tso->link = END_TSO_QUEUE;
2970 tso->why_blocked = NotBlocked;
2971 tso->block_info.closure = NULL;
2972 APPEND_TO_RUN_QUEUE(tso);
2976 /* -----------------------------------------------------------------------------
2979 * The following function implements the magic for raising an
2980 * asynchronous exception in an existing thread.
2982 * We first remove the thread from any queue on which it might be
2983 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2985 * We strip the stack down to the innermost CATCH_FRAME, building
2986 * thunks in the heap for all the active computations, so they can
2987 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2988 * an application of the handler to the exception, and push it on
2989 * the top of the stack.
2991 * How exactly do we save all the active computations? We create an
2992 * AP_STACK for every UpdateFrame on the stack. Entering one of these
2993 * AP_STACKs pushes everything from the corresponding update frame
2994 * upwards onto the stack. (Actually, it pushes everything up to the
2995 * next update frame plus a pointer to the next AP_STACK object.
2996 * Entering the next AP_STACK object pushes more onto the stack until we
2997 * reach the last AP_STACK object - at which point the stack should look
2998 * exactly as it did when we killed the TSO and we can continue
2999 * execution by entering the closure on top of the stack.
3001 * We can also kill a thread entirely - this happens if either (a) the
3002 * exception passed to raiseAsync is NULL, or (b) there's no
3003 * CATCH_FRAME on the stack. In either case, we strip the entire
3004 * stack and replace the thread with a zombie.
3006 * Locks: sched_mutex held upon entry nor exit.
3008 * -------------------------------------------------------------------------- */
3011 deleteThread(StgTSO *tso)
3013 if (tso->why_blocked != BlockedOnCCall &&
3014 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3015 raiseAsync(tso,NULL);
3019 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3021 deleteThreadImmediately(StgTSO *tso)
3022 { // for forkProcess only:
3023 // delete thread without giving it a chance to catch the KillThread exception
3025 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3029 if (tso->why_blocked != BlockedOnCCall &&
3030 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3034 tso->what_next = ThreadKilled;
3039 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3041 /* When raising async exs from contexts where sched_mutex isn't held;
3042 use raiseAsyncWithLock(). */
3043 ACQUIRE_LOCK(&sched_mutex);
3044 raiseAsync(tso,exception);
3045 RELEASE_LOCK(&sched_mutex);
3049 raiseAsync(StgTSO *tso, StgClosure *exception)
3051 raiseAsync_(tso, exception, rtsFalse);
3055 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3057 StgRetInfoTable *info;
3060 // Thread already dead?
3061 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3066 sched_belch("raising exception in thread %ld.", (long)tso->id));
3068 // Remove it from any blocking queues
3073 // The stack freezing code assumes there's a closure pointer on
3074 // the top of the stack, so we have to arrange that this is the case...
3076 if (sp[0] == (W_)&stg_enter_info) {
3080 sp[0] = (W_)&stg_dummy_ret_closure;
3086 // 1. Let the top of the stack be the "current closure"
3088 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3091 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3092 // current closure applied to the chunk of stack up to (but not
3093 // including) the update frame. This closure becomes the "current
3094 // closure". Go back to step 2.
3096 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3097 // top of the stack applied to the exception.
3099 // 5. If it's a STOP_FRAME, then kill the thread.
3101 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
3108 info = get_ret_itbl((StgClosure *)frame);
3110 while (info->i.type != UPDATE_FRAME
3111 && (info->i.type != CATCH_FRAME || exception == NULL)
3112 && info->i.type != STOP_FRAME
3113 && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3115 if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3116 // IF we find an ATOMICALLY_FRAME then we abort the
3117 // current transaction and propagate the exception. In
3118 // this case (unlike ordinary exceptions) we do not care
3119 // whether the transaction is valid or not because its
3120 // possible validity cannot have caused the exception
3121 // and will not be visible after the abort.
3123 debugBelch("Found atomically block delivering async exception\n"));
3124 stmAbortTransaction(tso -> trec);
3125 tso -> trec = stmGetEnclosingTRec(tso -> trec);
3127 frame += stack_frame_sizeW((StgClosure *)frame);
3128 info = get_ret_itbl((StgClosure *)frame);
3131 switch (info->i.type) {
3133 case ATOMICALLY_FRAME:
3134 ASSERT(stop_at_atomically);
3135 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3136 stmCondemnTransaction(tso -> trec);
3140 // R1 is not a register: the return convention for IO in
3141 // this case puts the return value on the stack, so we
3142 // need to set up the stack to return to the atomically
3143 // frame properly...
3144 tso->sp = frame - 2;
3145 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3146 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3148 tso->what_next = ThreadRunGHC;
3152 // If we find a CATCH_FRAME, and we've got an exception to raise,
3153 // then build the THUNK raise(exception), and leave it on
3154 // top of the CATCH_FRAME ready to enter.
3158 StgCatchFrame *cf = (StgCatchFrame *)frame;
3162 // we've got an exception to raise, so let's pass it to the
3163 // handler in this frame.
3165 raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3166 TICK_ALLOC_SE_THK(1,0);
3167 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3168 raise->payload[0] = exception;
3170 // throw away the stack from Sp up to the CATCH_FRAME.
3174 /* Ensure that async excpetions are blocked now, so we don't get
3175 * a surprise exception before we get around to executing the
3178 if (tso->blocked_exceptions == NULL) {
3179 tso->blocked_exceptions = END_TSO_QUEUE;
3182 /* Put the newly-built THUNK on top of the stack, ready to execute
3183 * when the thread restarts.
3186 sp[-1] = (W_)&stg_enter_info;
3188 tso->what_next = ThreadRunGHC;
3189 IF_DEBUG(sanity, checkTSO(tso));
3198 // First build an AP_STACK consisting of the stack chunk above the
3199 // current update frame, with the top word on the stack as the
3202 words = frame - sp - 1;
3203 ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3206 ap->fun = (StgClosure *)sp[0];
3208 for(i=0; i < (nat)words; ++i) {
3209 ap->payload[i] = (StgClosure *)*sp++;
3212 SET_HDR(ap,&stg_AP_STACK_info,
3213 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3214 TICK_ALLOC_UP_THK(words+1,0);
3217 debugBelch("sched: Updating ");
3218 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3219 debugBelch(" with ");
3220 printObj((StgClosure *)ap);
3223 // Replace the updatee with an indirection - happily
3224 // this will also wake up any threads currently
3225 // waiting on the result.
3227 // Warning: if we're in a loop, more than one update frame on
3228 // the stack may point to the same object. Be careful not to
3229 // overwrite an IND_OLDGEN in this case, because we'll screw
3230 // up the mutable lists. To be on the safe side, don't
3231 // overwrite any kind of indirection at all. See also
3232 // threadSqueezeStack in GC.c, where we have to make a similar
3235 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3236 // revert the black hole
3237 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3240 sp += sizeofW(StgUpdateFrame) - 1;
3241 sp[0] = (W_)ap; // push onto stack
3246 // We've stripped the entire stack, the thread is now dead.
3247 sp += sizeofW(StgStopFrame);
3248 tso->what_next = ThreadKilled;
3259 /* -----------------------------------------------------------------------------
3260 raiseExceptionHelper
3262 This function is called by the raise# primitve, just so that we can
3263 move some of the tricky bits of raising an exception from C-- into
3264 C. Who knows, it might be a useful re-useable thing here too.
3265 -------------------------------------------------------------------------- */
3268 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3270 StgClosure *raise_closure = NULL;
3272 StgRetInfoTable *info;
3274 // This closure represents the expression 'raise# E' where E
3275 // is the exception raise. It is used to overwrite all the
3276 // thunks which are currently under evaluataion.
3280 // LDV profiling: stg_raise_info has THUNK as its closure
3281 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3282 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3283 // 1 does not cause any problem unless profiling is performed.
3284 // However, when LDV profiling goes on, we need to linearly scan
3285 // small object pool, where raise_closure is stored, so we should
3286 // use MIN_UPD_SIZE.
3288 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3289 // sizeofW(StgClosure)+1);
3293 // Walk up the stack, looking for the catch frame. On the way,
3294 // we update any closures pointed to from update frames with the
3295 // raise closure that we just built.
3299 info = get_ret_itbl((StgClosure *)p);
3300 next = p + stack_frame_sizeW((StgClosure *)p);
3301 switch (info->i.type) {
3304 // Only create raise_closure if we need to.
3305 if (raise_closure == NULL) {
3307 (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3308 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3309 raise_closure->payload[0] = exception;
3311 UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3315 case ATOMICALLY_FRAME:
3316 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3318 return ATOMICALLY_FRAME;
3324 case CATCH_STM_FRAME:
3325 IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3327 return CATCH_STM_FRAME;
3333 case CATCH_RETRY_FRAME:
3342 /* -----------------------------------------------------------------------------
3343 findRetryFrameHelper
3345 This function is called by the retry# primitive. It traverses the stack
3346 leaving tso->sp referring to the frame which should handle the retry.
3348 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3349 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3351 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3352 despite the similar implementation.
3354 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3355 not be created within memory transactions.
3356 -------------------------------------------------------------------------- */
3359 findRetryFrameHelper (StgTSO *tso)
3362 StgRetInfoTable *info;
3366 info = get_ret_itbl((StgClosure *)p);
3367 next = p + stack_frame_sizeW((StgClosure *)p);
3368 switch (info->i.type) {
3370 case ATOMICALLY_FRAME:
3371 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3373 return ATOMICALLY_FRAME;
3375 case CATCH_RETRY_FRAME:
3376 IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3378 return CATCH_RETRY_FRAME;
3380 case CATCH_STM_FRAME:
3382 ASSERT(info->i.type != CATCH_FRAME);
3383 ASSERT(info->i.type != STOP_FRAME);
3390 /* -----------------------------------------------------------------------------
3391 resurrectThreads is called after garbage collection on the list of
3392 threads found to be garbage. Each of these threads will be woken
3393 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3394 on an MVar, or NonTermination if the thread was blocked on a Black
3397 Locks: sched_mutex isn't held upon entry nor exit.
3398 -------------------------------------------------------------------------- */
3401 resurrectThreads( StgTSO *threads )
3405 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3406 next = tso->global_link;
3407 tso->global_link = all_threads;
3409 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3411 switch (tso->why_blocked) {
3413 case BlockedOnException:
3414 /* Called by GC - sched_mutex lock is currently held. */
3415 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3417 case BlockedOnBlackHole:
3418 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3421 raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3424 /* This might happen if the thread was blocked on a black hole
3425 * belonging to a thread that we've just woken up (raiseAsync
3426 * can wake up threads, remember...).
3430 barf("resurrectThreads: thread blocked in a strange way");
3435 /* ----------------------------------------------------------------------------
3436 * Debugging: why is a thread blocked
3437 * [Also provides useful information when debugging threaded programs
3438 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3439 ------------------------------------------------------------------------- */
3443 printThreadBlockage(StgTSO *tso)
3445 switch (tso->why_blocked) {
3447 debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3449 case BlockedOnWrite:
3450 debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3452 #if defined(mingw32_HOST_OS)
3453 case BlockedOnDoProc:
3454 debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3457 case BlockedOnDelay:
3458 debugBelch("is blocked until %d", tso->block_info.target);
3461 debugBelch("is blocked on an MVar");
3463 case BlockedOnException:
3464 debugBelch("is blocked on delivering an exception to thread %d",
3465 tso->block_info.tso->id);
3467 case BlockedOnBlackHole:
3468 debugBelch("is blocked on a black hole");
3471 debugBelch("is not blocked");
3475 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3476 tso->block_info.closure, info_type(tso->block_info.closure));
3478 case BlockedOnGA_NoSend:
3479 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3480 tso->block_info.closure, info_type(tso->block_info.closure));
3483 case BlockedOnCCall:
3484 debugBelch("is blocked on an external call");
3486 case BlockedOnCCall_NoUnblockExc:
3487 debugBelch("is blocked on an external call (exceptions were already blocked)");
3490 debugBelch("is blocked on an STM operation");
3493 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3494 tso->why_blocked, tso->id, tso);
3500 printThreadStatus(StgTSO *tso)
3502 switch (tso->what_next) {
3504 debugBelch("has been killed");
3506 case ThreadComplete:
3507 debugBelch("has completed");
3510 printThreadBlockage(tso);
3515 printAllThreads(void)
3520 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3521 ullong_format_string(TIME_ON_PROC(CurrentProc),
3522 time_string, rtsFalse/*no commas!*/);
3524 debugBelch("all threads at [%s]:\n", time_string);
3526 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3527 ullong_format_string(CURRENT_TIME,
3528 time_string, rtsFalse/*no commas!*/);
3530 debugBelch("all threads at [%s]:\n", time_string);
3532 debugBelch("all threads:\n");
3535 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3536 debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3539 void *label = lookupThreadLabel(t->id);
3540 if (label) debugBelch("[\"%s\"] ",(char *)label);
3543 printThreadStatus(t);
3551 Print a whole blocking queue attached to node (debugging only).
3555 print_bq (StgClosure *node)
3557 StgBlockingQueueElement *bqe;
3561 debugBelch("## BQ of closure %p (%s): ",
3562 node, info_type(node));
3564 /* should cover all closures that may have a blocking queue */
3565 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3566 get_itbl(node)->type == FETCH_ME_BQ ||
3567 get_itbl(node)->type == RBH ||
3568 get_itbl(node)->type == MVAR);
3570 ASSERT(node!=(StgClosure*)NULL); // sanity check
3572 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3576 Print a whole blocking queue starting with the element bqe.
3579 print_bqe (StgBlockingQueueElement *bqe)
3584 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3586 for (end = (bqe==END_BQ_QUEUE);
3587 !end; // iterate until bqe points to a CONSTR
3588 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3589 bqe = end ? END_BQ_QUEUE : bqe->link) {
3590 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3591 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3592 /* types of closures that may appear in a blocking queue */
3593 ASSERT(get_itbl(bqe)->type == TSO ||
3594 get_itbl(bqe)->type == BLOCKED_FETCH ||
3595 get_itbl(bqe)->type == CONSTR);
3596 /* only BQs of an RBH end with an RBH_Save closure */
3597 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3599 switch (get_itbl(bqe)->type) {
3601 debugBelch(" TSO %u (%x),",
3602 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3605 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3606 ((StgBlockedFetch *)bqe)->node,
3607 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3608 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3609 ((StgBlockedFetch *)bqe)->ga.weight);
3612 debugBelch(" %s (IP %p),",
3613 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3614 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3615 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3616 "RBH_Save_?"), get_itbl(bqe));
3619 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3620 info_type((StgClosure *)bqe)); // , node, info_type(node));
3626 # elif defined(GRAN)
3628 print_bq (StgClosure *node)
3630 StgBlockingQueueElement *bqe;
3631 PEs node_loc, tso_loc;
3634 /* should cover all closures that may have a blocking queue */
3635 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3636 get_itbl(node)->type == FETCH_ME_BQ ||
3637 get_itbl(node)->type == RBH);
3639 ASSERT(node!=(StgClosure*)NULL); // sanity check
3640 node_loc = where_is(node);
3642 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3643 node, info_type(node), node_loc);
3646 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3648 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3649 !end; // iterate until bqe points to a CONSTR
3650 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3651 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3652 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3653 /* types of closures that may appear in a blocking queue */
3654 ASSERT(get_itbl(bqe)->type == TSO ||
3655 get_itbl(bqe)->type == CONSTR);
3656 /* only BQs of an RBH end with an RBH_Save closure */
3657 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3659 tso_loc = where_is((StgClosure *)bqe);
3660 switch (get_itbl(bqe)->type) {
3662 debugBelch(" TSO %d (%p) on [PE %d],",
3663 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3666 debugBelch(" %s (IP %p),",
3667 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3668 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3669 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3670 "RBH_Save_?"), get_itbl(bqe));
3673 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3674 info_type((StgClosure *)bqe), node, info_type(node));
3682 Nice and easy: only TSOs on the blocking queue
3685 print_bq (StgClosure *node)
3689 ASSERT(node!=(StgClosure*)NULL); // sanity check
3690 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3691 tso != END_TSO_QUEUE;
3693 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3694 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3695 debugBelch(" TSO %d (%p),", tso->id, tso);
3708 for (i=0, tso=run_queue_hd;
3709 tso != END_TSO_QUEUE;
3718 sched_belch(char *s, ...)
3722 #ifdef RTS_SUPPORTS_THREADS
3723 debugBelch("sched (task %p): ", osThreadId());
3727 debugBelch("sched: ");