1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2004
7 * Different GHC ways use this scheduler quite differently (see comments below)
8 * Here is the global picture:
10 * WAY Name CPP flag What's it for
11 * --------------------------------------
12 * mp GUM PAR Parallel execution on a distrib. memory machine
13 * s SMP SMP Parallel execution on a shared memory machine
14 * mg GranSim GRAN Simulation of parallel execution
15 * md GUM/GdH DIST Distributed execution (based on GUM)
17 * --------------------------------------------------------------------------*/
20 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22 The main scheduling loop in GUM iterates until a finish message is received.
23 In that case a global flag @receivedFinish@ is set and this instance of
24 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25 for the handling of incoming messages, such as PP_FINISH.
26 Note that in the parallel case we have a system manager that coordinates
27 different PEs, each of which are running one instance of the RTS.
28 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33 The main scheduling code in GranSim is quite different from that in std
34 (concurrent) Haskell: while concurrent Haskell just iterates over the
35 threads in the runnable queue, GranSim is event driven, i.e. it iterates
36 over the events in the global event queue. -- HWL
39 #include "PosixSource.h"
44 #include "BlockAlloc.h"
48 #define COMPILING_SCHEDULER
50 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
60 #include "ThreadLabels.h"
61 #include "LdvProfile.h"
64 #include "Proftimer.h"
67 #if defined(GRAN) || defined(PAR)
68 # include "GranSimRts.h"
70 # include "ParallelRts.h"
71 # include "Parallel.h"
72 # include "ParallelDebug.h"
77 #include "Capability.h"
78 #include "OSThreads.h"
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
97 #define USED_IN_THREADED_RTS
99 #define USED_IN_THREADED_RTS STG_UNUSED
102 #ifdef RTS_SUPPORTS_THREADS
103 #define USED_WHEN_RTS_SUPPORTS_THREADS
105 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
108 /* Main thread queue.
109 * Locks required: sched_mutex.
111 StgMainThread *main_threads = NULL;
114 * Locks required: sched_mutex.
118 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
119 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
122 In GranSim we have a runnable and a blocked queue for each processor.
123 In order to minimise code changes new arrays run_queue_hds/tls
124 are created. run_queue_hd is then a short cut (macro) for
125 run_queue_hds[CurrentProc] (see GranSim.h).
128 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
129 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
130 StgTSO *ccalling_threadss[MAX_PROC];
131 /* We use the same global list of threads (all_threads) in GranSim as in
132 the std RTS (i.e. we are cheating). However, we don't use this list in
133 the GranSim specific code at the moment (so we are only potentially
138 StgTSO *run_queue_hd = NULL;
139 StgTSO *run_queue_tl = NULL;
140 StgTSO *blocked_queue_hd = NULL;
141 StgTSO *blocked_queue_tl = NULL;
142 StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */
146 /* Linked list of all threads.
147 * Used for detecting garbage collected threads.
149 StgTSO *all_threads = NULL;
151 /* When a thread performs a safe C call (_ccall_GC, using old
152 * terminology), it gets put on the suspended_ccalling_threads
153 * list. Used by the garbage collector.
155 static StgTSO *suspended_ccalling_threads;
157 static StgTSO *threadStackOverflow(StgTSO *tso);
159 /* KH: The following two flags are shared memory locations. There is no need
160 to lock them, since they are only unset at the end of a scheduler
164 /* flag set by signal handler to precipitate a context switch */
165 nat context_switch = 0;
167 /* if this flag is set as well, give up execution */
168 rtsBool interrupted = rtsFalse;
170 /* Next thread ID to allocate.
171 * Locks required: thread_id_mutex
173 static StgThreadID next_thread_id = 1;
176 * Pointers to the state of the current thread.
177 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
178 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
181 /* The smallest stack size that makes any sense is:
182 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
183 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
184 * + 1 (the closure to enter)
186 * + 1 (spare slot req'd by stg_ap_v_ret)
188 * A thread with this stack will bomb immediately with a stack
189 * overflow, which will increase its stack size.
192 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
199 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
200 * exists - earlier gccs apparently didn't.
205 static rtsBool ready_to_gc;
208 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
209 * in an MT setting, needed to signal that a worker thread shouldn't hang around
210 * in the scheduler when it is out of work.
212 static rtsBool shutting_down_scheduler = rtsFalse;
214 void addToBlockedQueue ( StgTSO *tso );
216 static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
217 void interruptStgRts ( void );
219 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
220 static void detectBlackHoles ( void );
223 #if defined(RTS_SUPPORTS_THREADS)
224 /* ToDo: carefully document the invariants that go together
225 * with these synchronisation objects.
227 Mutex sched_mutex = INIT_MUTEX_VAR;
228 Mutex term_mutex = INIT_MUTEX_VAR;
230 #endif /* RTS_SUPPORTS_THREADS */
234 rtsTime TimeOfLastYield;
235 rtsBool emitSchedule = rtsTrue;
239 static char *whatNext_strs[] = {
250 StgTSO * createSparkThread(rtsSpark spark);
251 StgTSO * activateSpark (rtsSpark spark);
254 /* ----------------------------------------------------------------------------
256 * ------------------------------------------------------------------------- */
258 #if defined(RTS_SUPPORTS_THREADS)
259 static rtsBool startingWorkerThread = rtsFalse;
261 static void taskStart(void);
265 ACQUIRE_LOCK(&sched_mutex);
266 startingWorkerThread = rtsFalse;
268 RELEASE_LOCK(&sched_mutex);
272 startSchedulerTaskIfNecessary(void)
274 if(run_queue_hd != END_TSO_QUEUE
275 || blocked_queue_hd != END_TSO_QUEUE
276 || sleeping_queue != END_TSO_QUEUE)
278 if(!startingWorkerThread)
279 { // we don't want to start another worker thread
280 // just because the last one hasn't yet reached the
281 // "waiting for capability" state
282 startingWorkerThread = rtsTrue;
283 if(!startTask(taskStart))
285 startingWorkerThread = rtsFalse;
292 /* ---------------------------------------------------------------------------
293 Main scheduling loop.
295 We use round-robin scheduling, each thread returning to the
296 scheduler loop when one of these conditions is detected:
299 * timer expires (thread yields)
304 Locking notes: we acquire the scheduler lock once at the beginning
305 of the scheduler loop, and release it when
307 * running a thread, or
308 * waiting for work, or
309 * waiting for a GC to complete.
312 In a GranSim setup this loop iterates over the global event queue.
313 This revolves around the global event queue, which determines what
314 to do next. Therefore, it's more complicated than either the
315 concurrent or the parallel (GUM) setup.
318 GUM iterates over incoming messages.
319 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
320 and sends out a fish whenever it has nothing to do; in-between
321 doing the actual reductions (shared code below) it processes the
322 incoming messages and deals with delayed operations
323 (see PendingFetches).
324 This is not the ugliest code you could imagine, but it's bloody close.
326 ------------------------------------------------------------------------ */
328 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
329 Capability *initialCapability )
333 StgThreadReturnCode ret;
341 rtsBool receivedFinish = rtsFalse;
343 nat tp_size, sp_size; // stats only
346 rtsBool was_interrupted = rtsFalse;
349 // Pre-condition: sched_mutex is held.
350 // We might have a capability, passed in as initialCapability.
351 cap = initialCapability;
353 #if defined(RTS_SUPPORTS_THREADS)
355 // in the threaded case, the capability is either passed in via the
356 // initialCapability parameter, or initialized inside the scheduler
360 sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
361 mainThread, initialCapability);
364 // simply initialise it in the non-threaded case
365 grabCapability(&cap);
369 /* set up first event to get things going */
370 /* ToDo: assign costs for system setup and init MainTSO ! */
371 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
373 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
376 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
377 G_TSO(CurrentTSO, 5));
379 if (RtsFlags.GranFlags.Light) {
380 /* Save current time; GranSim Light only */
381 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
384 event = get_next_event();
386 while (event!=(rtsEvent*)NULL) {
387 /* Choose the processor with the next event */
388 CurrentProc = event->proc;
389 CurrentTSO = event->tso;
393 while (!receivedFinish) { /* set by processMessages */
394 /* when receiving PP_FINISH message */
396 #else // everything except GRAN and PAR
402 IF_DEBUG(scheduler, printAllThreads());
404 #if defined(RTS_SUPPORTS_THREADS)
405 // Yield the capability to higher-priority tasks if necessary.
408 yieldCapability(&cap);
411 // If we do not currently hold a capability, we wait for one
414 waitForCapability(&sched_mutex, &cap,
415 mainThread ? &mainThread->bound_thread_cond : NULL);
418 // We now have a capability...
422 // If we're interrupted (the user pressed ^C, or some other
423 // termination condition occurred), kill all the currently running
427 IF_DEBUG(scheduler, sched_belch("interrupted"));
428 interrupted = rtsFalse;
429 was_interrupted = rtsTrue;
430 #if defined(RTS_SUPPORTS_THREADS)
431 // In the threaded RTS, deadlock detection doesn't work,
432 // so just exit right away.
433 errorBelch("interrupted");
434 releaseCapability(cap);
435 RELEASE_LOCK(&sched_mutex);
436 shutdownHaskellAndExit(EXIT_SUCCESS);
442 #if defined(RTS_USER_SIGNALS)
443 // check for signals each time around the scheduler
444 if (signals_pending()) {
445 RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
446 startSignalHandlers();
447 ACQUIRE_LOCK(&sched_mutex);
452 // Check whether any waiting threads need to be woken up. If the
453 // run queue is empty, and there are no other tasks running, we
454 // can wait indefinitely for something to happen.
456 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
457 #if defined(RTS_SUPPORTS_THREADS)
462 awaitEvent( EMPTY_RUN_QUEUE() );
464 // we can be interrupted while waiting for I/O...
465 if (interrupted) continue;
468 * Detect deadlock: when we have no threads to run, there are no
469 * threads waiting on I/O or sleeping, and all the other tasks are
470 * waiting for work, we must have a deadlock of some description.
472 * We first try to find threads blocked on themselves (ie. black
473 * holes), and generate NonTermination exceptions where necessary.
475 * If no threads are black holed, we have a deadlock situation, so
476 * inform all the main threads.
478 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
479 if ( EMPTY_THREAD_QUEUES() )
481 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
482 // Garbage collection can release some new threads due to
483 // either (a) finalizers or (b) threads resurrected because
484 // they are about to be send BlockedOnDeadMVar. Any threads
485 // thus released will be immediately runnable.
486 GarbageCollect(GetRoots,rtsTrue);
488 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
491 sched_belch("still deadlocked, checking for black holes..."));
494 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
496 #if defined(RTS_USER_SIGNALS)
497 /* If we have user-installed signal handlers, then wait
498 * for signals to arrive rather then bombing out with a
501 if ( anyUserHandlers() ) {
503 sched_belch("still deadlocked, waiting for signals..."));
507 // we might be interrupted...
508 if (interrupted) { continue; }
510 if (signals_pending()) {
511 RELEASE_LOCK(&sched_mutex);
512 startSignalHandlers();
513 ACQUIRE_LOCK(&sched_mutex);
515 ASSERT(!EMPTY_RUN_QUEUE());
520 /* Probably a real deadlock. Send the current main thread the
521 * Deadlock exception (or in the SMP build, send *all* main
522 * threads the deadlock exception, since none of them can make
528 switch (m->tso->why_blocked) {
529 case BlockedOnBlackHole:
530 case BlockedOnException:
532 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
535 barf("deadlock: main thread blocked in a strange way");
541 #elif defined(RTS_SUPPORTS_THREADS)
542 // ToDo: add deadlock detection in threaded RTS
544 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
547 #if defined(RTS_SUPPORTS_THREADS)
548 if ( EMPTY_RUN_QUEUE() ) {
549 continue; // nothing to do
554 if (RtsFlags.GranFlags.Light)
555 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
557 /* adjust time based on time-stamp */
558 if (event->time > CurrentTime[CurrentProc] &&
559 event->evttype != ContinueThread)
560 CurrentTime[CurrentProc] = event->time;
562 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
563 if (!RtsFlags.GranFlags.Light)
566 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
568 /* main event dispatcher in GranSim */
569 switch (event->evttype) {
570 /* Should just be continuing execution */
572 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
573 /* ToDo: check assertion
574 ASSERT(run_queue_hd != (StgTSO*)NULL &&
575 run_queue_hd != END_TSO_QUEUE);
577 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
578 if (!RtsFlags.GranFlags.DoAsyncFetch &&
579 procStatus[CurrentProc]==Fetching) {
580 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
581 CurrentTSO->id, CurrentTSO, CurrentProc);
584 /* Ignore ContinueThreads for completed threads */
585 if (CurrentTSO->what_next == ThreadComplete) {
586 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
587 CurrentTSO->id, CurrentTSO, CurrentProc);
590 /* Ignore ContinueThreads for threads that are being migrated */
591 if (PROCS(CurrentTSO)==Nowhere) {
592 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
593 CurrentTSO->id, CurrentTSO, CurrentProc);
596 /* The thread should be at the beginning of the run queue */
597 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
598 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
599 CurrentTSO->id, CurrentTSO, CurrentProc);
600 break; // run the thread anyway
603 new_event(proc, proc, CurrentTime[proc],
605 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
607 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
608 break; // now actually run the thread; DaH Qu'vam yImuHbej
611 do_the_fetchnode(event);
612 goto next_thread; /* handle next event in event queue */
615 do_the_globalblock(event);
616 goto next_thread; /* handle next event in event queue */
619 do_the_fetchreply(event);
620 goto next_thread; /* handle next event in event queue */
622 case UnblockThread: /* Move from the blocked queue to the tail of */
623 do_the_unblock(event);
624 goto next_thread; /* handle next event in event queue */
626 case ResumeThread: /* Move from the blocked queue to the tail of */
627 /* the runnable queue ( i.e. Qu' SImqa'lu') */
628 event->tso->gran.blocktime +=
629 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
630 do_the_startthread(event);
631 goto next_thread; /* handle next event in event queue */
634 do_the_startthread(event);
635 goto next_thread; /* handle next event in event queue */
638 do_the_movethread(event);
639 goto next_thread; /* handle next event in event queue */
642 do_the_movespark(event);
643 goto next_thread; /* handle next event in event queue */
646 do_the_findwork(event);
647 goto next_thread; /* handle next event in event queue */
650 barf("Illegal event type %u\n", event->evttype);
653 /* This point was scheduler_loop in the old RTS */
655 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
657 TimeOfLastEvent = CurrentTime[CurrentProc];
658 TimeOfNextEvent = get_time_of_next_event();
659 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
660 // CurrentTSO = ThreadQueueHd;
662 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
665 if (RtsFlags.GranFlags.Light)
666 GranSimLight_leave_system(event, &ActiveTSO);
668 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
671 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
673 /* in a GranSim setup the TSO stays on the run queue */
675 /* Take a thread from the run queue. */
676 POP_RUN_QUEUE(t); // take_off_run_queue(t);
679 debugBelch("GRAN: About to run current thread, which is\n");
682 context_switch = 0; // turned on via GranYield, checking events and time slice
685 DumpGranEvent(GR_SCHEDULE, t));
687 procStatus[CurrentProc] = Busy;
690 if (PendingFetches != END_BF_QUEUE) {
694 /* ToDo: phps merge with spark activation above */
695 /* check whether we have local work and send requests if we have none */
696 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
697 /* :-[ no local threads => look out for local sparks */
698 /* the spark pool for the current PE */
699 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
700 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
701 pool->hd < pool->tl) {
703 * ToDo: add GC code check that we really have enough heap afterwards!!
705 * If we're here (no runnable threads) and we have pending
706 * sparks, we must have a space problem. Get enough space
707 * to turn one of those pending sparks into a
711 spark = findSpark(rtsFalse); /* get a spark */
712 if (spark != (rtsSpark) NULL) {
713 tso = activateSpark(spark); /* turn the spark into a thread */
714 IF_PAR_DEBUG(schedule,
715 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
716 tso->id, tso, advisory_thread_count));
718 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
719 debugBelch("==^^ failed to activate spark\n");
721 } /* otherwise fall through & pick-up new tso */
723 IF_PAR_DEBUG(verbose,
724 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
725 spark_queue_len(pool)));
730 /* If we still have no work we need to send a FISH to get a spark
733 if (EMPTY_RUN_QUEUE()) {
734 /* =8-[ no local sparks => look for work on other PEs */
736 * We really have absolutely no work. Send out a fish
737 * (there may be some out there already), and wait for
738 * something to arrive. We clearly can't run any threads
739 * until a SCHEDULE or RESUME arrives, and so that's what
740 * we're hoping to see. (Of course, we still have to
741 * respond to other types of messages.)
743 TIME now = msTime() /*CURRENT_TIME*/;
744 IF_PAR_DEBUG(verbose,
745 debugBelch("-- now=%ld\n", now));
746 IF_PAR_DEBUG(verbose,
747 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
748 (last_fish_arrived_at!=0 &&
749 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
750 debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
751 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
752 last_fish_arrived_at,
753 RtsFlags.ParFlags.fishDelay, now);
756 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
757 (last_fish_arrived_at==0 ||
758 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
759 /* outstandingFishes is set in sendFish, processFish;
760 avoid flooding system with fishes via delay */
762 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
765 // Global statistics: count no. of fishes
766 if (RtsFlags.ParFlags.ParStats.Global &&
767 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
768 globalParStats.tot_fish_mess++;
772 receivedFinish = processMessages();
775 } else if (PacketsWaiting()) { /* Look for incoming messages */
776 receivedFinish = processMessages();
779 /* Now we are sure that we have some work available */
780 ASSERT(run_queue_hd != END_TSO_QUEUE);
782 /* Take a thread from the run queue, if we have work */
783 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
784 IF_DEBUG(sanity,checkTSO(t));
786 /* ToDo: write something to the log-file
787 if (RTSflags.ParFlags.granSimStats && !sameThread)
788 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
792 /* the spark pool for the current PE */
793 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
796 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
797 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
800 if (0 && RtsFlags.ParFlags.ParStats.Full &&
801 t && LastTSO && t->id != LastTSO->id &&
802 LastTSO->why_blocked == NotBlocked &&
803 LastTSO->what_next != ThreadComplete) {
804 // if previously scheduled TSO not blocked we have to record the context switch
805 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
806 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
809 if (RtsFlags.ParFlags.ParStats.Full &&
810 (emitSchedule /* forced emit */ ||
811 (t && LastTSO && t->id != LastTSO->id))) {
813 we are running a different TSO, so write a schedule event to log file
814 NB: If we use fair scheduling we also have to write a deschedule
815 event for LastTSO; with unfair scheduling we know that the
816 previous tso has blocked whenever we switch to another tso, so
817 we don't need it in GUM for now
819 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
820 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
821 emitSchedule = rtsFalse;
825 #else /* !GRAN && !PAR */
827 // grab a thread from the run queue
828 ASSERT(run_queue_hd != END_TSO_QUEUE);
831 // Sanity check the thread we're about to run. This can be
832 // expensive if there is lots of thread switching going on...
833 IF_DEBUG(sanity,checkTSO(t));
838 StgMainThread *m = t->main;
845 sched_belch("### Running thread %d in bound thread", t->id));
846 // yes, the Haskell thread is bound to the current native thread
851 sched_belch("### thread %d bound to another OS thread", t->id));
852 // no, bound to a different Haskell thread: pass to that thread
853 PUSH_ON_RUN_QUEUE(t);
854 passCapability(&m->bound_thread_cond);
860 if(mainThread != NULL)
861 // The thread we want to run is bound.
864 sched_belch("### this OS thread cannot run thread %d", t->id));
865 // no, the current native thread is bound to a different
866 // Haskell thread, so pass it to any worker thread
867 PUSH_ON_RUN_QUEUE(t);
868 passCapabilityToWorker();
875 cap->r.rCurrentTSO = t;
877 /* context switches are now initiated by the timer signal, unless
878 * the user specified "context switch as often as possible", with
881 if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
882 && (run_queue_hd != END_TSO_QUEUE
883 || blocked_queue_hd != END_TSO_QUEUE
884 || sleeping_queue != END_TSO_QUEUE)))
889 RELEASE_LOCK(&sched_mutex);
891 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
892 (long)t->id, whatNext_strs[t->what_next]));
895 startHeapProfTimer();
898 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
899 /* Run the current thread
901 prev_what_next = t->what_next;
903 errno = t->saved_errno;
905 switch (prev_what_next) {
909 /* Thread already finished, return to scheduler. */
910 ret = ThreadFinished;
914 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
917 case ThreadInterpret:
918 ret = interpretBCO(cap);
922 barf("schedule: invalid what_next field");
925 // The TSO might have moved, so find the new location:
926 t = cap->r.rCurrentTSO;
928 // And save the current errno in this thread.
929 t->saved_errno = errno;
931 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
933 /* Costs for the scheduler are assigned to CCS_SYSTEM */
939 ACQUIRE_LOCK(&sched_mutex);
941 #ifdef RTS_SUPPORTS_THREADS
942 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
943 #elif !defined(GRAN) && !defined(PAR)
944 IF_DEBUG(scheduler,debugBelch("sched: "););
948 /* HACK 675: if the last thread didn't yield, make sure to print a
949 SCHEDULE event to the log file when StgRunning the next thread, even
950 if it is the same one as before */
952 TimeOfLastYield = CURRENT_TIME;
958 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
959 globalGranStats.tot_heapover++;
961 globalParStats.tot_heapover++;
964 // did the task ask for a large block?
965 if (cap->r.rHpAlloc > BLOCK_SIZE) {
966 // if so, get one and push it on the front of the nursery.
970 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
972 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n",
973 (long)t->id, whatNext_strs[t->what_next], blocks));
975 // don't do this if it would push us over the
976 // alloc_blocks_lim limit; we'll GC first.
977 if (alloc_blocks + blocks < alloc_blocks_lim) {
979 alloc_blocks += blocks;
980 bd = allocGroup( blocks );
982 // link the new group into the list
983 bd->link = cap->r.rCurrentNursery;
984 bd->u.back = cap->r.rCurrentNursery->u.back;
985 if (cap->r.rCurrentNursery->u.back != NULL) {
986 cap->r.rCurrentNursery->u.back->link = bd;
988 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
989 g0s0->blocks == cap->r.rNursery);
990 cap->r.rNursery = g0s0->blocks = bd;
992 cap->r.rCurrentNursery->u.back = bd;
994 // initialise it as a nursery block. We initialise the
995 // step, gen_no, and flags field of *every* sub-block in
996 // this large block, because this is easier than making
997 // sure that we always find the block head of a large
998 // block whenever we call Bdescr() (eg. evacuate() and
999 // isAlive() in the GC would both have to do this, at
1003 for (x = bd; x < bd + blocks; x++) {
1010 // don't forget to update the block count in g0s0.
1011 g0s0->n_blocks += blocks;
1012 // This assert can be a killer if the app is doing lots
1013 // of large block allocations.
1014 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1016 // now update the nursery to point to the new block
1017 cap->r.rCurrentNursery = bd;
1019 // we might be unlucky and have another thread get on the
1020 // run queue before us and steal the large block, but in that
1021 // case the thread will just end up requesting another large
1023 PUSH_ON_RUN_QUEUE(t);
1028 /* make all the running tasks block on a condition variable,
1029 * maybe set context_switch and wait till they all pile in,
1030 * then have them wait on a GC condition variable.
1032 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1033 (long)t->id, whatNext_strs[t->what_next]));
1036 ASSERT(!is_on_queue(t,CurrentProc));
1038 /* Currently we emit a DESCHEDULE event before GC in GUM.
1039 ToDo: either add separate event to distinguish SYSTEM time from rest
1040 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1041 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1042 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1043 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1044 emitSchedule = rtsTrue;
1048 ready_to_gc = rtsTrue;
1049 context_switch = 1; /* stop other threads ASAP */
1050 PUSH_ON_RUN_QUEUE(t);
1051 /* actual GC is done at the end of the while loop */
1057 DumpGranEvent(GR_DESCHEDULE, t));
1058 globalGranStats.tot_stackover++;
1061 // DumpGranEvent(GR_DESCHEDULE, t);
1062 globalParStats.tot_stackover++;
1064 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1065 (long)t->id, whatNext_strs[t->what_next]));
1066 /* just adjust the stack for this thread, then pop it back
1071 /* enlarge the stack */
1072 StgTSO *new_t = threadStackOverflow(t);
1074 /* This TSO has moved, so update any pointers to it from the
1075 * main thread stack. It better not be on any other queues...
1076 * (it shouldn't be).
1078 if (t->main != NULL) {
1079 t->main->tso = new_t;
1081 PUSH_ON_RUN_QUEUE(new_t);
1085 case ThreadYielding:
1086 // Reset the context switch flag. We don't do this just before
1087 // running the thread, because that would mean we would lose ticks
1088 // during GC, which can lead to unfair scheduling (a thread hogs
1089 // the CPU because the tick always arrives during GC). This way
1090 // penalises threads that do a lot of allocation, but that seems
1091 // better than the alternative.
1096 DumpGranEvent(GR_DESCHEDULE, t));
1097 globalGranStats.tot_yields++;
1100 // DumpGranEvent(GR_DESCHEDULE, t);
1101 globalParStats.tot_yields++;
1103 /* put the thread back on the run queue. Then, if we're ready to
1104 * GC, check whether this is the last task to stop. If so, wake
1105 * up the GC thread. getThread will block during a GC until the
1109 if (t->what_next != prev_what_next) {
1110 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1111 (long)t->id, whatNext_strs[t->what_next]);
1113 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1114 (long)t->id, whatNext_strs[t->what_next]);
1119 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1121 ASSERT(t->link == END_TSO_QUEUE);
1123 // Shortcut if we're just switching evaluators: don't bother
1124 // doing stack squeezing (which can be expensive), just run the
1126 if (t->what_next != prev_what_next) {
1133 ASSERT(!is_on_queue(t,CurrentProc));
1136 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1137 checkThreadQsSanity(rtsTrue));
1141 if (RtsFlags.ParFlags.doFairScheduling) {
1142 /* this does round-robin scheduling; good for concurrency */
1143 APPEND_TO_RUN_QUEUE(t);
1145 /* this does unfair scheduling; good for parallelism */
1146 PUSH_ON_RUN_QUEUE(t);
1149 // this does round-robin scheduling; good for concurrency
1150 APPEND_TO_RUN_QUEUE(t);
1154 /* add a ContinueThread event to actually process the thread */
1155 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1157 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1159 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1168 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1169 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1170 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1172 // ??? needed; should emit block before
1174 DumpGranEvent(GR_DESCHEDULE, t));
1175 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1178 ASSERT(procStatus[CurrentProc]==Busy ||
1179 ((procStatus[CurrentProc]==Fetching) &&
1180 (t->block_info.closure!=(StgClosure*)NULL)));
1181 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1182 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1183 procStatus[CurrentProc]==Fetching))
1184 procStatus[CurrentProc] = Idle;
1188 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1189 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1192 if (t->block_info.closure!=(StgClosure*)NULL)
1193 print_bq(t->block_info.closure));
1195 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1198 /* whatever we schedule next, we must log that schedule */
1199 emitSchedule = rtsTrue;
1202 /* don't need to do anything. Either the thread is blocked on
1203 * I/O, in which case we'll have called addToBlockedQueue
1204 * previously, or it's blocked on an MVar or Blackhole, in which
1205 * case it'll be on the relevant queue already.
1208 debugBelch("--<< thread %d (%s) stopped: ",
1209 t->id, whatNext_strs[t->what_next]);
1210 printThreadBlockage(t);
1213 /* Only for dumping event to log file
1214 ToDo: do I need this in GranSim, too?
1221 case ThreadFinished:
1222 /* Need to check whether this was a main thread, and if so, signal
1223 * the task that started it with the return value. If we have no
1224 * more main threads, we probably need to stop all the tasks until
1227 /* We also end up here if the thread kills itself with an
1228 * uncaught exception, see Exception.hc.
1230 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1231 t->id, whatNext_strs[t->what_next]));
1233 endThread(t, CurrentProc); // clean-up the thread
1235 /* For now all are advisory -- HWL */
1236 //if(t->priority==AdvisoryPriority) ??
1237 advisory_thread_count--;
1240 if(t->dist.priority==RevalPriority)
1244 if (RtsFlags.ParFlags.ParStats.Full &&
1245 !RtsFlags.ParFlags.ParStats.Suppressed)
1246 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1250 // Check whether the thread that just completed was a main
1251 // thread, and if so return with the result.
1253 // There is an assumption here that all thread completion goes
1254 // through this point; we need to make sure that if a thread
1255 // ends up in the ThreadKilled state, that it stays on the run
1256 // queue so it can be dealt with here.
1259 #if defined(RTS_SUPPORTS_THREADS)
1262 mainThread->tso == t
1266 // We are a bound thread: this must be our thread that just
1268 ASSERT(mainThread->tso == t);
1270 if (t->what_next == ThreadComplete) {
1271 if (mainThread->ret) {
1272 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1273 *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1];
1275 mainThread->stat = Success;
1277 if (mainThread->ret) {
1278 *(mainThread->ret) = NULL;
1280 if (was_interrupted) {
1281 mainThread->stat = Interrupted;
1283 mainThread->stat = Killed;
1287 removeThreadLabel((StgWord)mainThread->tso->id);
1289 if (mainThread->prev == NULL) {
1290 main_threads = mainThread->link;
1292 mainThread->prev->link = mainThread->link;
1294 if (mainThread->link != NULL) {
1295 mainThread->link->prev = NULL;
1297 releaseCapability(cap);
1301 #ifdef RTS_SUPPORTS_THREADS
1302 ASSERT(t->main == NULL);
1304 if (t->main != NULL) {
1305 // Must be a main thread that is not the topmost one. Leave
1306 // it on the run queue until the stack has unwound to the
1307 // point where we can deal with this. Leaving it on the run
1308 // queue also ensures that the garbage collector knows about
1309 // this thread and its return value (it gets dropped from the
1310 // all_threads list so there's no other way to find it).
1311 APPEND_TO_RUN_QUEUE(t);
1317 barf("schedule: invalid thread return code %d", (int)ret);
1321 // When we have +RTS -i0 and we're heap profiling, do a census at
1322 // every GC. This lets us get repeatable runs for debugging.
1323 if (performHeapProfile ||
1324 (RtsFlags.ProfFlags.profileInterval==0 &&
1325 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1326 GarbageCollect(GetRoots, rtsTrue);
1328 performHeapProfile = rtsFalse;
1329 ready_to_gc = rtsFalse; // we already GC'd
1334 /* everybody back, start the GC.
1335 * Could do it in this thread, or signal a condition var
1336 * to do it in another thread. Either way, we need to
1337 * broadcast on gc_pending_cond afterward.
1339 #if defined(RTS_SUPPORTS_THREADS)
1340 IF_DEBUG(scheduler,sched_belch("doing GC"));
1342 GarbageCollect(GetRoots,rtsFalse);
1343 ready_to_gc = rtsFalse;
1345 /* add a ContinueThread event to continue execution of current thread */
1346 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1348 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1350 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1358 IF_GRAN_DEBUG(unused,
1359 print_eventq(EventHd));
1361 event = get_next_event();
1364 /* ToDo: wait for next message to arrive rather than busy wait */
1367 } /* end of while(1) */
1369 IF_PAR_DEBUG(verbose,
1370 debugBelch("== Leaving schedule() after having received Finish\n"));
1373 /* ---------------------------------------------------------------------------
1374 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1375 * used by Control.Concurrent for error checking.
1376 * ------------------------------------------------------------------------- */
1379 rtsSupportsBoundThreads(void)
1388 /* ---------------------------------------------------------------------------
1389 * isThreadBound(tso): check whether tso is bound to an OS thread.
1390 * ------------------------------------------------------------------------- */
1393 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1396 return (tso->main != NULL);
1401 /* ---------------------------------------------------------------------------
1402 * Singleton fork(). Do not copy any running threads.
1403 * ------------------------------------------------------------------------- */
1405 #ifndef mingw32_TARGET_OS
1406 #define FORKPROCESS_PRIMOP_SUPPORTED
1409 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1411 deleteThreadImmediately(StgTSO *tso);
1414 forkProcess(HsStablePtr *entry
1415 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1420 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1426 IF_DEBUG(scheduler,sched_belch("forking!"));
1427 rts_lock(); // This not only acquires sched_mutex, it also
1428 // makes sure that no other threads are running
1432 if (pid) { /* parent */
1434 /* just return the pid */
1438 } else { /* child */
1441 // delete all threads
1442 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1444 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1447 // don't allow threads to catch the ThreadKilled exception
1448 deleteThreadImmediately(t);
1451 // wipe the main thread list
1452 while((m = main_threads) != NULL) {
1453 main_threads = m->link;
1454 # ifdef THREADED_RTS
1455 closeCondition(&m->bound_thread_cond);
1460 # ifdef RTS_SUPPORTS_THREADS
1461 resetTaskManagerAfterFork(); // tell startTask() and friends that
1462 startingWorkerThread = rtsFalse; // we have no worker threads any more
1463 resetWorkerWakeupPipeAfterFork();
1466 rc = rts_evalStableIO(entry, NULL); // run the action
1467 rts_checkSchedStatus("forkProcess",rc);
1471 hs_exit(); // clean up and exit
1474 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1475 barf("forkProcess#: primop not supported, sorry!\n");
1480 /* ---------------------------------------------------------------------------
1481 * deleteAllThreads(): kill all the live threads.
1483 * This is used when we catch a user interrupt (^C), before performing
1484 * any necessary cleanups and running finalizers.
1486 * Locks: sched_mutex held.
1487 * ------------------------------------------------------------------------- */
1490 deleteAllThreads ( void )
1493 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1494 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1495 next = t->global_link;
1499 // The run queue now contains a bunch of ThreadKilled threads. We
1500 // must not throw these away: the main thread(s) will be in there
1501 // somewhere, and the main scheduler loop has to deal with it.
1502 // Also, the run queue is the only thing keeping these threads from
1503 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1505 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1506 ASSERT(sleeping_queue == END_TSO_QUEUE);
1509 /* startThread and insertThread are now in GranSim.c -- HWL */
1512 /* ---------------------------------------------------------------------------
1513 * Suspending & resuming Haskell threads.
1515 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1516 * its capability before calling the C function. This allows another
1517 * task to pick up the capability and carry on running Haskell
1518 * threads. It also means that if the C call blocks, it won't lock
1521 * The Haskell thread making the C call is put to sleep for the
1522 * duration of the call, on the susepended_ccalling_threads queue. We
1523 * give out a token to the task, which it can use to resume the thread
1524 * on return from the C function.
1525 * ------------------------------------------------------------------------- */
1528 suspendThread( StgRegTable *reg )
1532 int saved_errno = errno;
1534 /* assume that *reg is a pointer to the StgRegTable part
1537 cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1539 ACQUIRE_LOCK(&sched_mutex);
1542 sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1544 // XXX this might not be necessary --SDM
1545 cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1547 threadPaused(cap->r.rCurrentTSO);
1548 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1549 suspended_ccalling_threads = cap->r.rCurrentTSO;
1551 if(cap->r.rCurrentTSO->blocked_exceptions == NULL) {
1552 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1553 cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1555 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1558 /* Use the thread ID as the token; it should be unique */
1559 tok = cap->r.rCurrentTSO->id;
1561 /* Hand back capability */
1562 releaseCapability(cap);
1564 #if defined(RTS_SUPPORTS_THREADS)
1565 /* Preparing to leave the RTS, so ensure there's a native thread/task
1566 waiting to take over.
1568 IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1571 /* Other threads _might_ be available for execution; signal this */
1573 RELEASE_LOCK(&sched_mutex);
1575 errno = saved_errno;
1580 resumeThread( StgInt tok )
1582 StgTSO *tso, **prev;
1584 int saved_errno = errno;
1586 #if defined(RTS_SUPPORTS_THREADS)
1587 /* Wait for permission to re-enter the RTS with the result. */
1588 ACQUIRE_LOCK(&sched_mutex);
1589 waitForReturnCapability(&sched_mutex, &cap);
1591 IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1593 grabCapability(&cap);
1596 /* Remove the thread off of the suspended list */
1597 prev = &suspended_ccalling_threads;
1598 for (tso = suspended_ccalling_threads;
1599 tso != END_TSO_QUEUE;
1600 prev = &tso->link, tso = tso->link) {
1601 if (tso->id == (StgThreadID)tok) {
1606 if (tso == END_TSO_QUEUE) {
1607 barf("resumeThread: thread not found");
1609 tso->link = END_TSO_QUEUE;
1611 if(tso->why_blocked == BlockedOnCCall) {
1612 awakenBlockedQueueNoLock(tso->blocked_exceptions);
1613 tso->blocked_exceptions = NULL;
1616 /* Reset blocking status */
1617 tso->why_blocked = NotBlocked;
1619 cap->r.rCurrentTSO = tso;
1620 RELEASE_LOCK(&sched_mutex);
1621 errno = saved_errno;
1626 /* ---------------------------------------------------------------------------
1628 * ------------------------------------------------------------------------ */
1629 static void unblockThread(StgTSO *tso);
1631 /* ---------------------------------------------------------------------------
1632 * Comparing Thread ids.
1634 * This is used from STG land in the implementation of the
1635 * instances of Eq/Ord for ThreadIds.
1636 * ------------------------------------------------------------------------ */
1639 cmp_thread(StgPtr tso1, StgPtr tso2)
1641 StgThreadID id1 = ((StgTSO *)tso1)->id;
1642 StgThreadID id2 = ((StgTSO *)tso2)->id;
1644 if (id1 < id2) return (-1);
1645 if (id1 > id2) return 1;
1649 /* ---------------------------------------------------------------------------
1650 * Fetching the ThreadID from an StgTSO.
1652 * This is used in the implementation of Show for ThreadIds.
1653 * ------------------------------------------------------------------------ */
1655 rts_getThreadId(StgPtr tso)
1657 return ((StgTSO *)tso)->id;
1662 labelThread(StgPtr tso, char *label)
1667 /* Caveat: Once set, you can only set the thread name to "" */
1668 len = strlen(label)+1;
1669 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1670 strncpy(buf,label,len);
1671 /* Update will free the old memory for us */
1672 updateThreadLabel(((StgTSO *)tso)->id,buf);
1676 /* ---------------------------------------------------------------------------
1677 Create a new thread.
1679 The new thread starts with the given stack size. Before the
1680 scheduler can run, however, this thread needs to have a closure
1681 (and possibly some arguments) pushed on its stack. See
1682 pushClosure() in Schedule.h.
1684 createGenThread() and createIOThread() (in SchedAPI.h) are
1685 convenient packaged versions of this function.
1687 currently pri (priority) is only used in a GRAN setup -- HWL
1688 ------------------------------------------------------------------------ */
1690 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1692 createThread(nat size, StgInt pri)
1695 createThread(nat size)
1702 /* First check whether we should create a thread at all */
1704 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1705 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1707 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1708 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1709 return END_TSO_QUEUE;
1715 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1718 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1720 /* catch ridiculously small stack sizes */
1721 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1722 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1725 stack_size = size - TSO_STRUCT_SIZEW;
1727 tso = (StgTSO *)allocate(size);
1728 TICK_ALLOC_TSO(stack_size, 0);
1730 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1732 SET_GRAN_HDR(tso, ThisPE);
1735 // Always start with the compiled code evaluator
1736 tso->what_next = ThreadRunGHC;
1738 tso->id = next_thread_id++;
1739 tso->why_blocked = NotBlocked;
1740 tso->blocked_exceptions = NULL;
1742 tso->saved_errno = 0;
1745 tso->stack_size = stack_size;
1746 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1748 tso->sp = (P_)&(tso->stack) + stack_size;
1751 tso->prof.CCCS = CCS_MAIN;
1754 /* put a stop frame on the stack */
1755 tso->sp -= sizeofW(StgStopFrame);
1756 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1757 tso->link = END_TSO_QUEUE;
1761 /* uses more flexible routine in GranSim */
1762 insertThread(tso, CurrentProc);
1764 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1770 if (RtsFlags.GranFlags.GranSimStats.Full)
1771 DumpGranEvent(GR_START,tso);
1773 if (RtsFlags.ParFlags.ParStats.Full)
1774 DumpGranEvent(GR_STARTQ,tso);
1775 /* HACk to avoid SCHEDULE
1779 /* Link the new thread on the global thread list.
1781 tso->global_link = all_threads;
1785 tso->dist.priority = MandatoryPriority; //by default that is...
1789 tso->gran.pri = pri;
1791 tso->gran.magic = TSO_MAGIC; // debugging only
1793 tso->gran.sparkname = 0;
1794 tso->gran.startedat = CURRENT_TIME;
1795 tso->gran.exported = 0;
1796 tso->gran.basicblocks = 0;
1797 tso->gran.allocs = 0;
1798 tso->gran.exectime = 0;
1799 tso->gran.fetchtime = 0;
1800 tso->gran.fetchcount = 0;
1801 tso->gran.blocktime = 0;
1802 tso->gran.blockcount = 0;
1803 tso->gran.blockedat = 0;
1804 tso->gran.globalsparks = 0;
1805 tso->gran.localsparks = 0;
1806 if (RtsFlags.GranFlags.Light)
1807 tso->gran.clock = Now; /* local clock */
1809 tso->gran.clock = 0;
1811 IF_DEBUG(gran,printTSO(tso));
1814 tso->par.magic = TSO_MAGIC; // debugging only
1816 tso->par.sparkname = 0;
1817 tso->par.startedat = CURRENT_TIME;
1818 tso->par.exported = 0;
1819 tso->par.basicblocks = 0;
1820 tso->par.allocs = 0;
1821 tso->par.exectime = 0;
1822 tso->par.fetchtime = 0;
1823 tso->par.fetchcount = 0;
1824 tso->par.blocktime = 0;
1825 tso->par.blockcount = 0;
1826 tso->par.blockedat = 0;
1827 tso->par.globalsparks = 0;
1828 tso->par.localsparks = 0;
1832 globalGranStats.tot_threads_created++;
1833 globalGranStats.threads_created_on_PE[CurrentProc]++;
1834 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1835 globalGranStats.tot_sq_probes++;
1837 // collect parallel global statistics (currently done together with GC stats)
1838 if (RtsFlags.ParFlags.ParStats.Global &&
1839 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1840 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
1841 globalParStats.tot_threads_created++;
1847 sched_belch("==__ schedule: Created TSO %d (%p);",
1848 CurrentProc, tso, tso->id));
1850 IF_PAR_DEBUG(verbose,
1851 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1852 (long)tso->id, tso, advisory_thread_count));
1854 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1855 (long)tso->id, (long)tso->stack_size));
1862 all parallel thread creation calls should fall through the following routine.
1865 createSparkThread(rtsSpark spark)
1867 ASSERT(spark != (rtsSpark)NULL);
1868 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1870 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1871 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1872 return END_TSO_QUEUE;
1876 tso = createThread(RtsFlags.GcFlags.initialStkSize);
1877 if (tso==END_TSO_QUEUE)
1878 barf("createSparkThread: Cannot create TSO");
1880 tso->priority = AdvisoryPriority;
1882 pushClosure(tso,spark);
1883 PUSH_ON_RUN_QUEUE(tso);
1884 advisory_thread_count++;
1891 Turn a spark into a thread.
1892 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1896 activateSpark (rtsSpark spark)
1900 tso = createSparkThread(spark);
1901 if (RtsFlags.ParFlags.ParStats.Full) {
1902 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1903 IF_PAR_DEBUG(verbose,
1904 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1905 (StgClosure *)spark, info_type((StgClosure *)spark)));
1907 // ToDo: fwd info on local/global spark to thread -- HWL
1908 // tso->gran.exported = spark->exported;
1909 // tso->gran.locked = !spark->global;
1910 // tso->gran.sparkname = spark->name;
1916 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1917 Capability *initialCapability
1921 /* ---------------------------------------------------------------------------
1924 * scheduleThread puts a thread on the head of the runnable queue.
1925 * This will usually be done immediately after a thread is created.
1926 * The caller of scheduleThread must create the thread using e.g.
1927 * createThread and push an appropriate closure
1928 * on this thread's stack before the scheduler is invoked.
1929 * ------------------------------------------------------------------------ */
1931 static void scheduleThread_ (StgTSO* tso);
1934 scheduleThread_(StgTSO *tso)
1936 // Precondition: sched_mutex must be held.
1937 // The thread goes at the *end* of the run-queue, to avoid possible
1938 // starvation of any threads already on the queue.
1939 APPEND_TO_RUN_QUEUE(tso);
1944 scheduleThread(StgTSO* tso)
1946 ACQUIRE_LOCK(&sched_mutex);
1947 scheduleThread_(tso);
1948 RELEASE_LOCK(&sched_mutex);
1951 #if defined(RTS_SUPPORTS_THREADS)
1952 static Condition bound_cond_cache;
1953 static int bound_cond_cache_full = 0;
1958 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1959 Capability *initialCapability)
1961 // Precondition: sched_mutex must be held
1964 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1969 m->link = main_threads;
1971 if (main_threads != NULL) {
1972 main_threads->prev = m;
1976 #if defined(RTS_SUPPORTS_THREADS)
1977 // Allocating a new condition for each thread is expensive, so we
1978 // cache one. This is a pretty feeble hack, but it helps speed up
1979 // consecutive call-ins quite a bit.
1980 if (bound_cond_cache_full) {
1981 m->bound_thread_cond = bound_cond_cache;
1982 bound_cond_cache_full = 0;
1984 initCondition(&m->bound_thread_cond);
1988 /* Put the thread on the main-threads list prior to scheduling the TSO.
1989 Failure to do so introduces a race condition in the MT case (as
1990 identified by Wolfgang Thaller), whereby the new task/OS thread
1991 created by scheduleThread_() would complete prior to the thread
1992 that spawned it managed to put 'itself' on the main-threads list.
1993 The upshot of it all being that the worker thread wouldn't get to
1994 signal the completion of the its work item for the main thread to
1995 see (==> it got stuck waiting.) -- sof 6/02.
1997 IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
1999 APPEND_TO_RUN_QUEUE(tso);
2000 // NB. Don't call THREAD_RUNNABLE() here, because the thread is
2001 // bound and only runnable by *this* OS thread, so waking up other
2002 // workers will just slow things down.
2004 return waitThread_(m, initialCapability);
2007 /* ---------------------------------------------------------------------------
2010 * Initialise the scheduler. This resets all the queues - if the
2011 * queues contained any threads, they'll be garbage collected at the
2014 * ------------------------------------------------------------------------ */
2022 for (i=0; i<=MAX_PROC; i++) {
2023 run_queue_hds[i] = END_TSO_QUEUE;
2024 run_queue_tls[i] = END_TSO_QUEUE;
2025 blocked_queue_hds[i] = END_TSO_QUEUE;
2026 blocked_queue_tls[i] = END_TSO_QUEUE;
2027 ccalling_threadss[i] = END_TSO_QUEUE;
2028 sleeping_queue = END_TSO_QUEUE;
2031 run_queue_hd = END_TSO_QUEUE;
2032 run_queue_tl = END_TSO_QUEUE;
2033 blocked_queue_hd = END_TSO_QUEUE;
2034 blocked_queue_tl = END_TSO_QUEUE;
2035 sleeping_queue = END_TSO_QUEUE;
2038 suspended_ccalling_threads = END_TSO_QUEUE;
2040 main_threads = NULL;
2041 all_threads = END_TSO_QUEUE;
2046 RtsFlags.ConcFlags.ctxtSwitchTicks =
2047 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2049 #if defined(RTS_SUPPORTS_THREADS)
2050 /* Initialise the mutex and condition variables used by
2052 initMutex(&sched_mutex);
2053 initMutex(&term_mutex);
2056 ACQUIRE_LOCK(&sched_mutex);
2058 /* A capability holds the state a native thread needs in
2059 * order to execute STG code. At least one capability is
2060 * floating around (only SMP builds have more than one).
2064 #if defined(RTS_SUPPORTS_THREADS)
2065 /* start our haskell execution tasks */
2066 startTaskManager(0,taskStart);
2069 #if /* defined(SMP) ||*/ defined(PAR)
2073 RELEASE_LOCK(&sched_mutex);
2077 exitScheduler( void )
2079 #if defined(RTS_SUPPORTS_THREADS)
2082 shutting_down_scheduler = rtsTrue;
2085 /* ----------------------------------------------------------------------------
2086 Managing the per-task allocation areas.
2088 Each capability comes with an allocation area. These are
2089 fixed-length block lists into which allocation can be done.
2091 ToDo: no support for two-space collection at the moment???
2092 ------------------------------------------------------------------------- */
2096 waitThread_(StgMainThread* m, Capability *initialCapability)
2098 SchedulerStatus stat;
2100 // Precondition: sched_mutex must be held.
2101 IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2104 /* GranSim specific init */
2105 CurrentTSO = m->tso; // the TSO to run
2106 procStatus[MainProc] = Busy; // status of main PE
2107 CurrentProc = MainProc; // PE to run it on
2108 schedule(m,initialCapability);
2110 schedule(m,initialCapability);
2111 ASSERT(m->stat != NoStatus);
2116 #if defined(RTS_SUPPORTS_THREADS)
2117 // Free the condition variable, returning it to the cache if possible.
2118 if (!bound_cond_cache_full) {
2119 bound_cond_cache = m->bound_thread_cond;
2120 bound_cond_cache_full = 1;
2122 closeCondition(&m->bound_thread_cond);
2126 IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2129 // Postcondition: sched_mutex still held
2133 /* ---------------------------------------------------------------------------
2134 Where are the roots that we know about?
2136 - all the threads on the runnable queue
2137 - all the threads on the blocked queue
2138 - all the threads on the sleeping queue
2139 - all the thread currently executing a _ccall_GC
2140 - all the "main threads"
2142 ------------------------------------------------------------------------ */
2144 /* This has to be protected either by the scheduler monitor, or by the
2145 garbage collection monitor (probably the latter).
2150 GetRoots( evac_fn evac )
2155 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2156 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2157 evac((StgClosure **)&run_queue_hds[i]);
2158 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2159 evac((StgClosure **)&run_queue_tls[i]);
2161 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2162 evac((StgClosure **)&blocked_queue_hds[i]);
2163 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2164 evac((StgClosure **)&blocked_queue_tls[i]);
2165 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2166 evac((StgClosure **)&ccalling_threads[i]);
2173 if (run_queue_hd != END_TSO_QUEUE) {
2174 ASSERT(run_queue_tl != END_TSO_QUEUE);
2175 evac((StgClosure **)&run_queue_hd);
2176 evac((StgClosure **)&run_queue_tl);
2179 if (blocked_queue_hd != END_TSO_QUEUE) {
2180 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2181 evac((StgClosure **)&blocked_queue_hd);
2182 evac((StgClosure **)&blocked_queue_tl);
2185 if (sleeping_queue != END_TSO_QUEUE) {
2186 evac((StgClosure **)&sleeping_queue);
2190 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2191 evac((StgClosure **)&suspended_ccalling_threads);
2194 #if defined(PAR) || defined(GRAN)
2195 markSparkQueue(evac);
2198 #if defined(RTS_USER_SIGNALS)
2199 // mark the signal handlers (signals should be already blocked)
2200 markSignalHandlers(evac);
2204 /* -----------------------------------------------------------------------------
2207 This is the interface to the garbage collector from Haskell land.
2208 We provide this so that external C code can allocate and garbage
2209 collect when called from Haskell via _ccall_GC.
2211 It might be useful to provide an interface whereby the programmer
2212 can specify more roots (ToDo).
2214 This needs to be protected by the GC condition variable above. KH.
2215 -------------------------------------------------------------------------- */
2217 static void (*extra_roots)(evac_fn);
2222 /* Obligated to hold this lock upon entry */
2223 ACQUIRE_LOCK(&sched_mutex);
2224 GarbageCollect(GetRoots,rtsFalse);
2225 RELEASE_LOCK(&sched_mutex);
2229 performMajorGC(void)
2231 ACQUIRE_LOCK(&sched_mutex);
2232 GarbageCollect(GetRoots,rtsTrue);
2233 RELEASE_LOCK(&sched_mutex);
2237 AllRoots(evac_fn evac)
2239 GetRoots(evac); // the scheduler's roots
2240 extra_roots(evac); // the user's roots
2244 performGCWithRoots(void (*get_roots)(evac_fn))
2246 ACQUIRE_LOCK(&sched_mutex);
2247 extra_roots = get_roots;
2248 GarbageCollect(AllRoots,rtsFalse);
2249 RELEASE_LOCK(&sched_mutex);
2252 /* -----------------------------------------------------------------------------
2255 If the thread has reached its maximum stack size, then raise the
2256 StackOverflow exception in the offending thread. Otherwise
2257 relocate the TSO into a larger chunk of memory and adjust its stack
2259 -------------------------------------------------------------------------- */
2262 threadStackOverflow(StgTSO *tso)
2264 nat new_stack_size, new_tso_size, stack_words;
2268 IF_DEBUG(sanity,checkTSO(tso));
2269 if (tso->stack_size >= tso->max_stack_size) {
2272 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2273 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2274 /* If we're debugging, just print out the top of the stack */
2275 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2278 /* Send this thread the StackOverflow exception */
2279 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2283 /* Try to double the current stack size. If that takes us over the
2284 * maximum stack size for this thread, then use the maximum instead.
2285 * Finally round up so the TSO ends up as a whole number of blocks.
2287 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2288 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2289 TSO_STRUCT_SIZE)/sizeof(W_);
2290 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2291 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2293 IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2295 dest = (StgTSO *)allocate(new_tso_size);
2296 TICK_ALLOC_TSO(new_stack_size,0);
2298 /* copy the TSO block and the old stack into the new area */
2299 memcpy(dest,tso,TSO_STRUCT_SIZE);
2300 stack_words = tso->stack + tso->stack_size - tso->sp;
2301 new_sp = (P_)dest + new_tso_size - stack_words;
2302 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2304 /* relocate the stack pointers... */
2306 dest->stack_size = new_stack_size;
2308 /* Mark the old TSO as relocated. We have to check for relocated
2309 * TSOs in the garbage collector and any primops that deal with TSOs.
2311 * It's important to set the sp value to just beyond the end
2312 * of the stack, so we don't attempt to scavenge any part of the
2315 tso->what_next = ThreadRelocated;
2317 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2318 tso->why_blocked = NotBlocked;
2319 dest->mut_link = NULL;
2321 IF_PAR_DEBUG(verbose,
2322 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2323 tso->id, tso, tso->stack_size);
2324 /* If we're debugging, just print out the top of the stack */
2325 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2328 IF_DEBUG(sanity,checkTSO(tso));
2330 IF_DEBUG(scheduler,printTSO(dest));
2336 /* ---------------------------------------------------------------------------
2337 Wake up a queue that was blocked on some resource.
2338 ------------------------------------------------------------------------ */
2342 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2347 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2349 /* write RESUME events to log file and
2350 update blocked and fetch time (depending on type of the orig closure) */
2351 if (RtsFlags.ParFlags.ParStats.Full) {
2352 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2353 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2354 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2355 if (EMPTY_RUN_QUEUE())
2356 emitSchedule = rtsTrue;
2358 switch (get_itbl(node)->type) {
2360 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2365 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2372 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2379 static StgBlockingQueueElement *
2380 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2383 PEs node_loc, tso_loc;
2385 node_loc = where_is(node); // should be lifted out of loop
2386 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2387 tso_loc = where_is((StgClosure *)tso);
2388 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2389 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2390 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2391 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2392 // insertThread(tso, node_loc);
2393 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2395 tso, node, (rtsSpark*)NULL);
2396 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2399 } else { // TSO is remote (actually should be FMBQ)
2400 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2401 RtsFlags.GranFlags.Costs.gunblocktime +
2402 RtsFlags.GranFlags.Costs.latency;
2403 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2405 tso, node, (rtsSpark*)NULL);
2406 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2409 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2411 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2412 (node_loc==tso_loc ? "Local" : "Global"),
2413 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2414 tso->block_info.closure = NULL;
2415 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
2419 static StgBlockingQueueElement *
2420 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2422 StgBlockingQueueElement *next;
2424 switch (get_itbl(bqe)->type) {
2426 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2427 /* if it's a TSO just push it onto the run_queue */
2429 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2430 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
2432 unblockCount(bqe, node);
2433 /* reset blocking status after dumping event */
2434 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2438 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2440 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2441 PendingFetches = (StgBlockedFetch *)bqe;
2445 /* can ignore this case in a non-debugging setup;
2446 see comments on RBHSave closures above */
2448 /* check that the closure is an RBHSave closure */
2449 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2450 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2451 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2455 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2456 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2460 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2464 #else /* !GRAN && !PAR */
2466 unblockOneLocked(StgTSO *tso)
2470 ASSERT(get_itbl(tso)->type == TSO);
2471 ASSERT(tso->why_blocked != NotBlocked);
2472 tso->why_blocked = NotBlocked;
2474 tso->link = END_TSO_QUEUE;
2475 APPEND_TO_RUN_QUEUE(tso);
2477 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2482 #if defined(GRAN) || defined(PAR)
2483 INLINE_ME StgBlockingQueueElement *
2484 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2486 ACQUIRE_LOCK(&sched_mutex);
2487 bqe = unblockOneLocked(bqe, node);
2488 RELEASE_LOCK(&sched_mutex);
2493 unblockOne(StgTSO *tso)
2495 ACQUIRE_LOCK(&sched_mutex);
2496 tso = unblockOneLocked(tso);
2497 RELEASE_LOCK(&sched_mutex);
2504 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2506 StgBlockingQueueElement *bqe;
2511 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2512 node, CurrentProc, CurrentTime[CurrentProc],
2513 CurrentTSO->id, CurrentTSO));
2515 node_loc = where_is(node);
2517 ASSERT(q == END_BQ_QUEUE ||
2518 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2519 get_itbl(q)->type == CONSTR); // closure (type constructor)
2520 ASSERT(is_unique(node));
2522 /* FAKE FETCH: magically copy the node to the tso's proc;
2523 no Fetch necessary because in reality the node should not have been
2524 moved to the other PE in the first place
2526 if (CurrentProc!=node_loc) {
2528 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2529 node, node_loc, CurrentProc, CurrentTSO->id,
2530 // CurrentTSO, where_is(CurrentTSO),
2531 node->header.gran.procs));
2532 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2534 debugBelch("## new bitmask of node %p is %#x\n",
2535 node, node->header.gran.procs));
2536 if (RtsFlags.GranFlags.GranSimStats.Global) {
2537 globalGranStats.tot_fake_fetches++;
2542 // ToDo: check: ASSERT(CurrentProc==node_loc);
2543 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2546 bqe points to the current element in the queue
2547 next points to the next element in the queue
2549 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2550 //tso_loc = where_is(tso);
2552 bqe = unblockOneLocked(bqe, node);
2555 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2556 the closure to make room for the anchor of the BQ */
2557 if (bqe!=END_BQ_QUEUE) {
2558 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2560 ASSERT((info_ptr==&RBH_Save_0_info) ||
2561 (info_ptr==&RBH_Save_1_info) ||
2562 (info_ptr==&RBH_Save_2_info));
2564 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2565 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2566 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2569 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2570 node, info_type(node)));
2573 /* statistics gathering */
2574 if (RtsFlags.GranFlags.GranSimStats.Global) {
2575 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2576 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2577 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2578 globalGranStats.tot_awbq++; // total no. of bqs awakened
2581 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2582 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2586 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2588 StgBlockingQueueElement *bqe;
2590 ACQUIRE_LOCK(&sched_mutex);
2592 IF_PAR_DEBUG(verbose,
2593 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2597 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2598 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2603 ASSERT(q == END_BQ_QUEUE ||
2604 get_itbl(q)->type == TSO ||
2605 get_itbl(q)->type == BLOCKED_FETCH ||
2606 get_itbl(q)->type == CONSTR);
2609 while (get_itbl(bqe)->type==TSO ||
2610 get_itbl(bqe)->type==BLOCKED_FETCH) {
2611 bqe = unblockOneLocked(bqe, node);
2613 RELEASE_LOCK(&sched_mutex);
2616 #else /* !GRAN && !PAR */
2619 awakenBlockedQueueNoLock(StgTSO *tso)
2621 while (tso != END_TSO_QUEUE) {
2622 tso = unblockOneLocked(tso);
2627 awakenBlockedQueue(StgTSO *tso)
2629 ACQUIRE_LOCK(&sched_mutex);
2630 while (tso != END_TSO_QUEUE) {
2631 tso = unblockOneLocked(tso);
2633 RELEASE_LOCK(&sched_mutex);
2637 /* ---------------------------------------------------------------------------
2639 - usually called inside a signal handler so it mustn't do anything fancy.
2640 ------------------------------------------------------------------------ */
2643 interruptStgRts(void)
2647 #ifdef RTS_SUPPORTS_THREADS
2648 wakeBlockedWorkerThread();
2652 /* -----------------------------------------------------------------------------
2655 This is for use when we raise an exception in another thread, which
2657 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2658 -------------------------------------------------------------------------- */
2660 #if defined(GRAN) || defined(PAR)
2662 NB: only the type of the blocking queue is different in GranSim and GUM
2663 the operations on the queue-elements are the same
2664 long live polymorphism!
2666 Locks: sched_mutex is held upon entry and exit.
2670 unblockThread(StgTSO *tso)
2672 StgBlockingQueueElement *t, **last;
2674 switch (tso->why_blocked) {
2677 return; /* not blocked */
2680 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2682 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2683 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2685 last = (StgBlockingQueueElement **)&mvar->head;
2686 for (t = (StgBlockingQueueElement *)mvar->head;
2688 last = &t->link, last_tso = t, t = t->link) {
2689 if (t == (StgBlockingQueueElement *)tso) {
2690 *last = (StgBlockingQueueElement *)tso->link;
2691 if (mvar->tail == tso) {
2692 mvar->tail = (StgTSO *)last_tso;
2697 barf("unblockThread (MVAR): TSO not found");
2700 case BlockedOnBlackHole:
2701 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2703 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2705 last = &bq->blocking_queue;
2706 for (t = bq->blocking_queue;
2708 last = &t->link, t = t->link) {
2709 if (t == (StgBlockingQueueElement *)tso) {
2710 *last = (StgBlockingQueueElement *)tso->link;
2714 barf("unblockThread (BLACKHOLE): TSO not found");
2717 case BlockedOnException:
2719 StgTSO *target = tso->block_info.tso;
2721 ASSERT(get_itbl(target)->type == TSO);
2723 if (target->what_next == ThreadRelocated) {
2724 target = target->link;
2725 ASSERT(get_itbl(target)->type == TSO);
2728 ASSERT(target->blocked_exceptions != NULL);
2730 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2731 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2733 last = &t->link, t = t->link) {
2734 ASSERT(get_itbl(t)->type == TSO);
2735 if (t == (StgBlockingQueueElement *)tso) {
2736 *last = (StgBlockingQueueElement *)tso->link;
2740 barf("unblockThread (Exception): TSO not found");
2744 case BlockedOnWrite:
2745 #if defined(mingw32_TARGET_OS)
2746 case BlockedOnDoProc:
2749 /* take TSO off blocked_queue */
2750 StgBlockingQueueElement *prev = NULL;
2751 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2752 prev = t, t = t->link) {
2753 if (t == (StgBlockingQueueElement *)tso) {
2755 blocked_queue_hd = (StgTSO *)t->link;
2756 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2757 blocked_queue_tl = END_TSO_QUEUE;
2760 prev->link = t->link;
2761 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2762 blocked_queue_tl = (StgTSO *)prev;
2768 barf("unblockThread (I/O): TSO not found");
2771 case BlockedOnDelay:
2773 /* take TSO off sleeping_queue */
2774 StgBlockingQueueElement *prev = NULL;
2775 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2776 prev = t, t = t->link) {
2777 if (t == (StgBlockingQueueElement *)tso) {
2779 sleeping_queue = (StgTSO *)t->link;
2781 prev->link = t->link;
2786 barf("unblockThread (delay): TSO not found");
2790 barf("unblockThread");
2794 tso->link = END_TSO_QUEUE;
2795 tso->why_blocked = NotBlocked;
2796 tso->block_info.closure = NULL;
2797 PUSH_ON_RUN_QUEUE(tso);
2801 unblockThread(StgTSO *tso)
2805 /* To avoid locking unnecessarily. */
2806 if (tso->why_blocked == NotBlocked) {
2810 switch (tso->why_blocked) {
2813 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2815 StgTSO *last_tso = END_TSO_QUEUE;
2816 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2819 for (t = mvar->head; t != END_TSO_QUEUE;
2820 last = &t->link, last_tso = t, t = t->link) {
2823 if (mvar->tail == tso) {
2824 mvar->tail = last_tso;
2829 barf("unblockThread (MVAR): TSO not found");
2832 case BlockedOnBlackHole:
2833 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2835 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2837 last = &bq->blocking_queue;
2838 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2839 last = &t->link, t = t->link) {
2845 barf("unblockThread (BLACKHOLE): TSO not found");
2848 case BlockedOnException:
2850 StgTSO *target = tso->block_info.tso;
2852 ASSERT(get_itbl(target)->type == TSO);
2854 while (target->what_next == ThreadRelocated) {
2855 target = target->link;
2856 ASSERT(get_itbl(target)->type == TSO);
2859 ASSERT(target->blocked_exceptions != NULL);
2861 last = &target->blocked_exceptions;
2862 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2863 last = &t->link, t = t->link) {
2864 ASSERT(get_itbl(t)->type == TSO);
2870 barf("unblockThread (Exception): TSO not found");
2874 case BlockedOnWrite:
2875 #if defined(mingw32_TARGET_OS)
2876 case BlockedOnDoProc:
2879 StgTSO *prev = NULL;
2880 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2881 prev = t, t = t->link) {
2884 blocked_queue_hd = t->link;
2885 if (blocked_queue_tl == t) {
2886 blocked_queue_tl = END_TSO_QUEUE;
2889 prev->link = t->link;
2890 if (blocked_queue_tl == t) {
2891 blocked_queue_tl = prev;
2897 barf("unblockThread (I/O): TSO not found");
2900 case BlockedOnDelay:
2902 StgTSO *prev = NULL;
2903 for (t = sleeping_queue; t != END_TSO_QUEUE;
2904 prev = t, t = t->link) {
2907 sleeping_queue = t->link;
2909 prev->link = t->link;
2914 barf("unblockThread (delay): TSO not found");
2918 barf("unblockThread");
2922 tso->link = END_TSO_QUEUE;
2923 tso->why_blocked = NotBlocked;
2924 tso->block_info.closure = NULL;
2925 APPEND_TO_RUN_QUEUE(tso);
2929 /* -----------------------------------------------------------------------------
2932 * The following function implements the magic for raising an
2933 * asynchronous exception in an existing thread.
2935 * We first remove the thread from any queue on which it might be
2936 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2938 * We strip the stack down to the innermost CATCH_FRAME, building
2939 * thunks in the heap for all the active computations, so they can
2940 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2941 * an application of the handler to the exception, and push it on
2942 * the top of the stack.
2944 * How exactly do we save all the active computations? We create an
2945 * AP_STACK for every UpdateFrame on the stack. Entering one of these
2946 * AP_STACKs pushes everything from the corresponding update frame
2947 * upwards onto the stack. (Actually, it pushes everything up to the
2948 * next update frame plus a pointer to the next AP_STACK object.
2949 * Entering the next AP_STACK object pushes more onto the stack until we
2950 * reach the last AP_STACK object - at which point the stack should look
2951 * exactly as it did when we killed the TSO and we can continue
2952 * execution by entering the closure on top of the stack.
2954 * We can also kill a thread entirely - this happens if either (a) the
2955 * exception passed to raiseAsync is NULL, or (b) there's no
2956 * CATCH_FRAME on the stack. In either case, we strip the entire
2957 * stack and replace the thread with a zombie.
2959 * Locks: sched_mutex held upon entry nor exit.
2961 * -------------------------------------------------------------------------- */
2964 deleteThread(StgTSO *tso)
2966 raiseAsync(tso,NULL);
2969 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2971 deleteThreadImmediately(StgTSO *tso)
2972 { // for forkProcess only:
2973 // delete thread without giving it a chance to catch the KillThread exception
2975 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2979 if (tso->why_blocked != BlockedOnCCall &&
2980 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2984 tso->what_next = ThreadKilled;
2989 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
2991 /* When raising async exs from contexts where sched_mutex isn't held;
2992 use raiseAsyncWithLock(). */
2993 ACQUIRE_LOCK(&sched_mutex);
2994 raiseAsync(tso,exception);
2995 RELEASE_LOCK(&sched_mutex);
2999 raiseAsync(StgTSO *tso, StgClosure *exception)
3001 StgRetInfoTable *info;
3004 // Thread already dead?
3005 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3010 sched_belch("raising exception in thread %ld.", (long)tso->id));
3012 // Remove it from any blocking queues
3017 // The stack freezing code assumes there's a closure pointer on
3018 // the top of the stack, so we have to arrange that this is the case...
3020 if (sp[0] == (W_)&stg_enter_info) {
3024 sp[0] = (W_)&stg_dummy_ret_closure;
3030 // 1. Let the top of the stack be the "current closure"
3032 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3035 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3036 // current closure applied to the chunk of stack up to (but not
3037 // including) the update frame. This closure becomes the "current
3038 // closure". Go back to step 2.
3040 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3041 // top of the stack applied to the exception.
3043 // 5. If it's a STOP_FRAME, then kill the thread.
3048 info = get_ret_itbl((StgClosure *)frame);
3050 while (info->i.type != UPDATE_FRAME
3051 && (info->i.type != CATCH_FRAME || exception == NULL)
3052 && info->i.type != STOP_FRAME) {
3053 frame += stack_frame_sizeW((StgClosure *)frame);
3054 info = get_ret_itbl((StgClosure *)frame);
3057 switch (info->i.type) {
3060 // If we find a CATCH_FRAME, and we've got an exception to raise,
3061 // then build the THUNK raise(exception), and leave it on
3062 // top of the CATCH_FRAME ready to enter.
3066 StgCatchFrame *cf = (StgCatchFrame *)frame;
3070 // we've got an exception to raise, so let's pass it to the
3071 // handler in this frame.
3073 raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3074 TICK_ALLOC_SE_THK(1,0);
3075 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3076 raise->payload[0] = exception;
3078 // throw away the stack from Sp up to the CATCH_FRAME.
3082 /* Ensure that async excpetions are blocked now, so we don't get
3083 * a surprise exception before we get around to executing the
3086 if (tso->blocked_exceptions == NULL) {
3087 tso->blocked_exceptions = END_TSO_QUEUE;
3090 /* Put the newly-built THUNK on top of the stack, ready to execute
3091 * when the thread restarts.
3094 sp[-1] = (W_)&stg_enter_info;
3096 tso->what_next = ThreadRunGHC;
3097 IF_DEBUG(sanity, checkTSO(tso));
3106 // First build an AP_STACK consisting of the stack chunk above the
3107 // current update frame, with the top word on the stack as the
3110 words = frame - sp - 1;
3111 ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3114 ap->fun = (StgClosure *)sp[0];
3116 for(i=0; i < (nat)words; ++i) {
3117 ap->payload[i] = (StgClosure *)*sp++;
3120 SET_HDR(ap,&stg_AP_STACK_info,
3121 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3122 TICK_ALLOC_UP_THK(words+1,0);
3125 debugBelch("sched: Updating ");
3126 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3127 debugBelch(" with ");
3128 printObj((StgClosure *)ap);
3131 // Replace the updatee with an indirection - happily
3132 // this will also wake up any threads currently
3133 // waiting on the result.
3135 // Warning: if we're in a loop, more than one update frame on
3136 // the stack may point to the same object. Be careful not to
3137 // overwrite an IND_OLDGEN in this case, because we'll screw
3138 // up the mutable lists. To be on the safe side, don't
3139 // overwrite any kind of indirection at all. See also
3140 // threadSqueezeStack in GC.c, where we have to make a similar
3143 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3144 // revert the black hole
3145 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3148 sp += sizeofW(StgUpdateFrame) - 1;
3149 sp[0] = (W_)ap; // push onto stack
3154 // We've stripped the entire stack, the thread is now dead.
3155 sp += sizeofW(StgStopFrame);
3156 tso->what_next = ThreadKilled;
3167 /* -----------------------------------------------------------------------------
3168 raiseExceptionHelper
3170 This function is called by the raise# primitve, just so that we can
3171 move some of the tricky bits of raising an exception from C-- into
3172 C. Who knows, it might be a useful re-useable thing here too.
3173 -------------------------------------------------------------------------- */
3176 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3178 StgClosure *raise_closure = NULL;
3180 StgRetInfoTable *info;
3182 // This closure represents the expression 'raise# E' where E
3183 // is the exception raise. It is used to overwrite all the
3184 // thunks which are currently under evaluataion.
3188 // LDV profiling: stg_raise_info has THUNK as its closure
3189 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3190 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3191 // 1 does not cause any problem unless profiling is performed.
3192 // However, when LDV profiling goes on, we need to linearly scan
3193 // small object pool, where raise_closure is stored, so we should
3194 // use MIN_UPD_SIZE.
3196 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3197 // sizeofW(StgClosure)+1);
3201 // Walk up the stack, looking for the catch frame. On the way,
3202 // we update any closures pointed to from update frames with the
3203 // raise closure that we just built.
3207 info = get_ret_itbl((StgClosure *)p);
3208 next = p + stack_frame_sizeW((StgClosure *)p);
3209 switch (info->i.type) {
3212 // Only create raise_closure if we need to.
3213 if (raise_closure == NULL) {
3215 (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3216 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3217 raise_closure->payload[0] = exception;
3219 UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3238 /* -----------------------------------------------------------------------------
3239 resurrectThreads is called after garbage collection on the list of
3240 threads found to be garbage. Each of these threads will be woken
3241 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3242 on an MVar, or NonTermination if the thread was blocked on a Black
3245 Locks: sched_mutex isn't held upon entry nor exit.
3246 -------------------------------------------------------------------------- */
3249 resurrectThreads( StgTSO *threads )
3253 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3254 next = tso->global_link;
3255 tso->global_link = all_threads;
3257 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3259 switch (tso->why_blocked) {
3261 case BlockedOnException:
3262 /* Called by GC - sched_mutex lock is currently held. */
3263 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3265 case BlockedOnBlackHole:
3266 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3269 /* This might happen if the thread was blocked on a black hole
3270 * belonging to a thread that we've just woken up (raiseAsync
3271 * can wake up threads, remember...).
3275 barf("resurrectThreads: thread blocked in a strange way");
3280 /* -----------------------------------------------------------------------------
3281 * Blackhole detection: if we reach a deadlock, test whether any
3282 * threads are blocked on themselves. Any threads which are found to
3283 * be self-blocked get sent a NonTermination exception.
3285 * This is only done in a deadlock situation in order to avoid
3286 * performance overhead in the normal case.
3288 * Locks: sched_mutex is held upon entry and exit.
3289 * -------------------------------------------------------------------------- */
3291 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
3293 detectBlackHoles( void )
3295 StgTSO *tso = all_threads;
3297 StgClosure *blocked_on;
3298 StgRetInfoTable *info;
3300 for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3302 while (tso->what_next == ThreadRelocated) {
3304 ASSERT(get_itbl(tso)->type == TSO);
3307 if (tso->why_blocked != BlockedOnBlackHole) {
3310 blocked_on = tso->block_info.closure;
3315 info = get_ret_itbl((StgClosure *)frame);
3316 switch (info->i.type) {
3318 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3319 /* We are blocking on one of our own computations, so
3320 * send this thread the NonTermination exception.
3323 sched_belch("thread %d is blocked on itself", tso->id));
3324 raiseAsync(tso, (StgClosure *)NonTermination_closure);
3328 frame = (StgPtr)((StgUpdateFrame *)frame + 1);
3334 // normal stack frames; do nothing except advance the pointer
3336 frame += stack_frame_sizeW((StgClosure *)frame);
3344 /* ----------------------------------------------------------------------------
3345 * Debugging: why is a thread blocked
3346 * [Also provides useful information when debugging threaded programs
3347 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3348 ------------------------------------------------------------------------- */
3352 printThreadBlockage(StgTSO *tso)
3354 switch (tso->why_blocked) {
3356 debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3358 case BlockedOnWrite:
3359 debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3361 #if defined(mingw32_TARGET_OS)
3362 case BlockedOnDoProc:
3363 debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3366 case BlockedOnDelay:
3367 debugBelch("is blocked until %d", tso->block_info.target);
3370 debugBelch("is blocked on an MVar");
3372 case BlockedOnException:
3373 debugBelch("is blocked on delivering an exception to thread %d",
3374 tso->block_info.tso->id);
3376 case BlockedOnBlackHole:
3377 debugBelch("is blocked on a black hole");
3380 debugBelch("is not blocked");
3384 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3385 tso->block_info.closure, info_type(tso->block_info.closure));
3387 case BlockedOnGA_NoSend:
3388 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3389 tso->block_info.closure, info_type(tso->block_info.closure));
3392 case BlockedOnCCall:
3393 debugBelch("is blocked on an external call");
3395 case BlockedOnCCall_NoUnblockExc:
3396 debugBelch("is blocked on an external call (exceptions were already blocked)");
3399 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3400 tso->why_blocked, tso->id, tso);
3406 printThreadStatus(StgTSO *tso)
3408 switch (tso->what_next) {
3410 debugBelch("has been killed");
3412 case ThreadComplete:
3413 debugBelch("has completed");
3416 printThreadBlockage(tso);
3421 printAllThreads(void)
3427 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3428 ullong_format_string(TIME_ON_PROC(CurrentProc),
3429 time_string, rtsFalse/*no commas!*/);
3431 debugBelch("all threads at [%s]:\n", time_string);
3433 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3434 ullong_format_string(CURRENT_TIME,
3435 time_string, rtsFalse/*no commas!*/);
3437 debugBelch("all threads at [%s]:\n", time_string);
3439 debugBelch("all threads:\n");
3442 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3443 debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3444 label = lookupThreadLabel(t->id);
3445 if (label) debugBelch("[\"%s\"] ",(char *)label);
3446 printThreadStatus(t);
3454 Print a whole blocking queue attached to node (debugging only).
3458 print_bq (StgClosure *node)
3460 StgBlockingQueueElement *bqe;
3464 debugBelch("## BQ of closure %p (%s): ",
3465 node, info_type(node));
3467 /* should cover all closures that may have a blocking queue */
3468 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3469 get_itbl(node)->type == FETCH_ME_BQ ||
3470 get_itbl(node)->type == RBH ||
3471 get_itbl(node)->type == MVAR);
3473 ASSERT(node!=(StgClosure*)NULL); // sanity check
3475 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3479 Print a whole blocking queue starting with the element bqe.
3482 print_bqe (StgBlockingQueueElement *bqe)
3487 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3489 for (end = (bqe==END_BQ_QUEUE);
3490 !end; // iterate until bqe points to a CONSTR
3491 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3492 bqe = end ? END_BQ_QUEUE : bqe->link) {
3493 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3494 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3495 /* types of closures that may appear in a blocking queue */
3496 ASSERT(get_itbl(bqe)->type == TSO ||
3497 get_itbl(bqe)->type == BLOCKED_FETCH ||
3498 get_itbl(bqe)->type == CONSTR);
3499 /* only BQs of an RBH end with an RBH_Save closure */
3500 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3502 switch (get_itbl(bqe)->type) {
3504 debugBelch(" TSO %u (%x),",
3505 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3508 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3509 ((StgBlockedFetch *)bqe)->node,
3510 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3511 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3512 ((StgBlockedFetch *)bqe)->ga.weight);
3515 debugBelch(" %s (IP %p),",
3516 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3517 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3518 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3519 "RBH_Save_?"), get_itbl(bqe));
3522 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3523 info_type((StgClosure *)bqe)); // , node, info_type(node));
3529 # elif defined(GRAN)
3531 print_bq (StgClosure *node)
3533 StgBlockingQueueElement *bqe;
3534 PEs node_loc, tso_loc;
3537 /* should cover all closures that may have a blocking queue */
3538 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3539 get_itbl(node)->type == FETCH_ME_BQ ||
3540 get_itbl(node)->type == RBH);
3542 ASSERT(node!=(StgClosure*)NULL); // sanity check
3543 node_loc = where_is(node);
3545 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3546 node, info_type(node), node_loc);
3549 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3551 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3552 !end; // iterate until bqe points to a CONSTR
3553 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3554 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3555 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3556 /* types of closures that may appear in a blocking queue */
3557 ASSERT(get_itbl(bqe)->type == TSO ||
3558 get_itbl(bqe)->type == CONSTR);
3559 /* only BQs of an RBH end with an RBH_Save closure */
3560 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3562 tso_loc = where_is((StgClosure *)bqe);
3563 switch (get_itbl(bqe)->type) {
3565 debugBelch(" TSO %d (%p) on [PE %d],",
3566 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3569 debugBelch(" %s (IP %p),",
3570 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3571 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3572 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3573 "RBH_Save_?"), get_itbl(bqe));
3576 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3577 info_type((StgClosure *)bqe), node, info_type(node));
3585 Nice and easy: only TSOs on the blocking queue
3588 print_bq (StgClosure *node)
3592 ASSERT(node!=(StgClosure*)NULL); // sanity check
3593 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3594 tso != END_TSO_QUEUE;
3596 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3597 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3598 debugBelch(" TSO %d (%p),", tso->id, tso);
3611 for (i=0, tso=run_queue_hd;
3612 tso != END_TSO_QUEUE;
3621 sched_belch(char *s, ...)
3625 #ifdef RTS_SUPPORTS_THREADS
3626 debugBelch("sched (task %p): ", osThreadId());
3630 debugBelch("sched: ");