1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2004
7 * Different GHC ways use this scheduler quite differently (see comments below)
8 * Here is the global picture:
10 * WAY Name CPP flag What's it for
11 * --------------------------------------
12 * mp GUM PAR Parallel execution on a distrib. memory machine
13 * s SMP SMP Parallel execution on a shared memory machine
14 * mg GranSim GRAN Simulation of parallel execution
15 * md GUM/GdH DIST Distributed execution (based on GUM)
17 * --------------------------------------------------------------------------*/
20 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22 The main scheduling loop in GUM iterates until a finish message is received.
23 In that case a global flag @receivedFinish@ is set and this instance of
24 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25 for the handling of incoming messages, such as PP_FINISH.
26 Note that in the parallel case we have a system manager that coordinates
27 different PEs, each of which are running one instance of the RTS.
28 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33 The main scheduling code in GranSim is quite different from that in std
34 (concurrent) Haskell: while concurrent Haskell just iterates over the
35 threads in the runnable queue, GranSim is event driven, i.e. it iterates
36 over the events in the global event queue. -- HWL
39 #include "PosixSource.h"
44 #include "BlockAlloc.h"
48 #define COMPILING_SCHEDULER
50 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
60 #include "ThreadLabels.h"
61 #include "LdvProfile.h"
64 #include "Proftimer.h"
67 #if defined(GRAN) || defined(PAR)
68 # include "GranSimRts.h"
70 # include "ParallelRts.h"
71 # include "Parallel.h"
72 # include "ParallelDebug.h"
77 #include "Capability.h"
78 #include "OSThreads.h"
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
97 #define USED_IN_THREADED_RTS
99 #define USED_IN_THREADED_RTS STG_UNUSED
102 #ifdef RTS_SUPPORTS_THREADS
103 #define USED_WHEN_RTS_SUPPORTS_THREADS
105 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
108 /* Main thread queue.
109 * Locks required: sched_mutex.
111 StgMainThread *main_threads = NULL;
114 * Locks required: sched_mutex.
118 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
119 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
122 In GranSim we have a runnable and a blocked queue for each processor.
123 In order to minimise code changes new arrays run_queue_hds/tls
124 are created. run_queue_hd is then a short cut (macro) for
125 run_queue_hds[CurrentProc] (see GranSim.h).
128 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
129 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
130 StgTSO *ccalling_threadss[MAX_PROC];
131 /* We use the same global list of threads (all_threads) in GranSim as in
132 the std RTS (i.e. we are cheating). However, we don't use this list in
133 the GranSim specific code at the moment (so we are only potentially
138 StgTSO *run_queue_hd = NULL;
139 StgTSO *run_queue_tl = NULL;
140 StgTSO *blocked_queue_hd = NULL;
141 StgTSO *blocked_queue_tl = NULL;
142 StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */
146 /* Linked list of all threads.
147 * Used for detecting garbage collected threads.
149 StgTSO *all_threads = NULL;
151 /* When a thread performs a safe C call (_ccall_GC, using old
152 * terminology), it gets put on the suspended_ccalling_threads
153 * list. Used by the garbage collector.
155 static StgTSO *suspended_ccalling_threads;
157 static StgTSO *threadStackOverflow(StgTSO *tso);
159 /* KH: The following two flags are shared memory locations. There is no need
160 to lock them, since they are only unset at the end of a scheduler
164 /* flag set by signal handler to precipitate a context switch */
165 int context_switch = 0;
167 /* if this flag is set as well, give up execution */
168 rtsBool interrupted = rtsFalse;
170 /* Next thread ID to allocate.
171 * Locks required: thread_id_mutex
173 static StgThreadID next_thread_id = 1;
176 * Pointers to the state of the current thread.
177 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
178 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
181 /* The smallest stack size that makes any sense is:
182 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
183 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
184 * + 1 (the closure to enter)
186 * + 1 (spare slot req'd by stg_ap_v_ret)
188 * A thread with this stack will bomb immediately with a stack
189 * overflow, which will increase its stack size.
192 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
199 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
200 * exists - earlier gccs apparently didn't.
205 static rtsBool ready_to_gc;
208 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
209 * in an MT setting, needed to signal that a worker thread shouldn't hang around
210 * in the scheduler when it is out of work.
212 static rtsBool shutting_down_scheduler = rtsFalse;
214 void addToBlockedQueue ( StgTSO *tso );
216 static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
217 void interruptStgRts ( void );
219 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
220 static void detectBlackHoles ( void );
223 #if defined(RTS_SUPPORTS_THREADS)
224 /* ToDo: carefully document the invariants that go together
225 * with these synchronisation objects.
227 Mutex sched_mutex = INIT_MUTEX_VAR;
228 Mutex term_mutex = INIT_MUTEX_VAR;
230 #endif /* RTS_SUPPORTS_THREADS */
234 rtsTime TimeOfLastYield;
235 rtsBool emitSchedule = rtsTrue;
239 static char *whatNext_strs[] = {
250 StgTSO * createSparkThread(rtsSpark spark);
251 StgTSO * activateSpark (rtsSpark spark);
254 /* ----------------------------------------------------------------------------
256 * ------------------------------------------------------------------------- */
258 #if defined(RTS_SUPPORTS_THREADS)
259 static rtsBool startingWorkerThread = rtsFalse;
261 static void taskStart(void);
265 ACQUIRE_LOCK(&sched_mutex);
266 startingWorkerThread = rtsFalse;
268 RELEASE_LOCK(&sched_mutex);
272 startSchedulerTaskIfNecessary(void)
274 if(run_queue_hd != END_TSO_QUEUE
275 || blocked_queue_hd != END_TSO_QUEUE
276 || sleeping_queue != END_TSO_QUEUE)
278 if(!startingWorkerThread)
279 { // we don't want to start another worker thread
280 // just because the last one hasn't yet reached the
281 // "waiting for capability" state
282 startingWorkerThread = rtsTrue;
283 if(!startTask(taskStart))
285 startingWorkerThread = rtsFalse;
292 /* ---------------------------------------------------------------------------
293 Main scheduling loop.
295 We use round-robin scheduling, each thread returning to the
296 scheduler loop when one of these conditions is detected:
299 * timer expires (thread yields)
304 Locking notes: we acquire the scheduler lock once at the beginning
305 of the scheduler loop, and release it when
307 * running a thread, or
308 * waiting for work, or
309 * waiting for a GC to complete.
312 In a GranSim setup this loop iterates over the global event queue.
313 This revolves around the global event queue, which determines what
314 to do next. Therefore, it's more complicated than either the
315 concurrent or the parallel (GUM) setup.
318 GUM iterates over incoming messages.
319 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
320 and sends out a fish whenever it has nothing to do; in-between
321 doing the actual reductions (shared code below) it processes the
322 incoming messages and deals with delayed operations
323 (see PendingFetches).
324 This is not the ugliest code you could imagine, but it's bloody close.
326 ------------------------------------------------------------------------ */
328 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
329 Capability *initialCapability )
333 StgThreadReturnCode ret;
341 rtsBool receivedFinish = rtsFalse;
343 nat tp_size, sp_size; // stats only
346 rtsBool was_interrupted = rtsFalse;
349 // Pre-condition: sched_mutex is held.
350 // We might have a capability, passed in as initialCapability.
351 cap = initialCapability;
353 #if defined(RTS_SUPPORTS_THREADS)
355 // in the threaded case, the capability is either passed in via the
356 // initialCapability parameter, or initialized inside the scheduler
360 sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
361 mainThread, initialCapability);
364 // simply initialise it in the non-threaded case
365 grabCapability(&cap);
369 /* set up first event to get things going */
370 /* ToDo: assign costs for system setup and init MainTSO ! */
371 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
373 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
376 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
377 G_TSO(CurrentTSO, 5));
379 if (RtsFlags.GranFlags.Light) {
380 /* Save current time; GranSim Light only */
381 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
384 event = get_next_event();
386 while (event!=(rtsEvent*)NULL) {
387 /* Choose the processor with the next event */
388 CurrentProc = event->proc;
389 CurrentTSO = event->tso;
393 while (!receivedFinish) { /* set by processMessages */
394 /* when receiving PP_FINISH message */
396 #else // everything except GRAN and PAR
402 IF_DEBUG(scheduler, printAllThreads());
404 #if defined(RTS_SUPPORTS_THREADS)
405 // Yield the capability to higher-priority tasks if necessary.
408 yieldCapability(&cap);
411 // If we do not currently hold a capability, we wait for one
414 waitForCapability(&sched_mutex, &cap,
415 mainThread ? &mainThread->bound_thread_cond : NULL);
418 // We now have a capability...
422 // If we're interrupted (the user pressed ^C, or some other
423 // termination condition occurred), kill all the currently running
427 IF_DEBUG(scheduler, sched_belch("interrupted"));
428 interrupted = rtsFalse;
429 was_interrupted = rtsTrue;
430 #if defined(RTS_SUPPORTS_THREADS)
431 // In the threaded RTS, deadlock detection doesn't work,
432 // so just exit right away.
433 errorBelch("interrupted");
434 releaseCapability(cap);
435 RELEASE_LOCK(&sched_mutex);
436 shutdownHaskellAndExit(EXIT_SUCCESS);
442 #if defined(RTS_USER_SIGNALS)
443 // check for signals each time around the scheduler
444 if (signals_pending()) {
445 RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
446 startSignalHandlers();
447 ACQUIRE_LOCK(&sched_mutex);
452 // Check whether any waiting threads need to be woken up. If the
453 // run queue is empty, and there are no other tasks running, we
454 // can wait indefinitely for something to happen.
456 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
458 #if defined(RTS_SUPPORTS_THREADS)
459 // We shouldn't be here...
460 barf("schedule: awaitEvent() in threaded RTS");
462 awaitEvent( EMPTY_RUN_QUEUE() );
464 // we can be interrupted while waiting for I/O...
465 if (interrupted) continue;
468 * Detect deadlock: when we have no threads to run, there are no
469 * threads waiting on I/O or sleeping, and all the other tasks are
470 * waiting for work, we must have a deadlock of some description.
472 * We first try to find threads blocked on themselves (ie. black
473 * holes), and generate NonTermination exceptions where necessary.
475 * If no threads are black holed, we have a deadlock situation, so
476 * inform all the main threads.
478 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
479 if ( EMPTY_THREAD_QUEUES() )
481 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
483 // Garbage collection can release some new threads due to
484 // either (a) finalizers or (b) threads resurrected because
485 // they are unreachable and will therefore be sent an
486 // exception. Any threads thus released will be immediately
488 GarbageCollect(GetRoots,rtsTrue);
489 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
491 #if defined(RTS_USER_SIGNALS)
492 /* If we have user-installed signal handlers, then wait
493 * for signals to arrive rather then bombing out with a
496 if ( anyUserHandlers() ) {
498 sched_belch("still deadlocked, waiting for signals..."));
502 // we might be interrupted...
503 if (interrupted) { continue; }
505 if (signals_pending()) {
506 RELEASE_LOCK(&sched_mutex);
507 startSignalHandlers();
508 ACQUIRE_LOCK(&sched_mutex);
510 ASSERT(!EMPTY_RUN_QUEUE());
515 /* Probably a real deadlock. Send the current main thread the
516 * Deadlock exception (or in the SMP build, send *all* main
517 * threads the deadlock exception, since none of them can make
523 switch (m->tso->why_blocked) {
524 case BlockedOnBlackHole:
525 case BlockedOnException:
527 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
530 barf("deadlock: main thread blocked in a strange way");
536 #elif defined(RTS_SUPPORTS_THREADS)
537 // ToDo: add deadlock detection in threaded RTS
539 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
542 #if defined(RTS_SUPPORTS_THREADS)
543 if ( EMPTY_RUN_QUEUE() ) {
544 continue; // nothing to do
549 if (RtsFlags.GranFlags.Light)
550 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
552 /* adjust time based on time-stamp */
553 if (event->time > CurrentTime[CurrentProc] &&
554 event->evttype != ContinueThread)
555 CurrentTime[CurrentProc] = event->time;
557 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
558 if (!RtsFlags.GranFlags.Light)
561 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
563 /* main event dispatcher in GranSim */
564 switch (event->evttype) {
565 /* Should just be continuing execution */
567 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
568 /* ToDo: check assertion
569 ASSERT(run_queue_hd != (StgTSO*)NULL &&
570 run_queue_hd != END_TSO_QUEUE);
572 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
573 if (!RtsFlags.GranFlags.DoAsyncFetch &&
574 procStatus[CurrentProc]==Fetching) {
575 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
576 CurrentTSO->id, CurrentTSO, CurrentProc);
579 /* Ignore ContinueThreads for completed threads */
580 if (CurrentTSO->what_next == ThreadComplete) {
581 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
582 CurrentTSO->id, CurrentTSO, CurrentProc);
585 /* Ignore ContinueThreads for threads that are being migrated */
586 if (PROCS(CurrentTSO)==Nowhere) {
587 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
588 CurrentTSO->id, CurrentTSO, CurrentProc);
591 /* The thread should be at the beginning of the run queue */
592 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
593 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
594 CurrentTSO->id, CurrentTSO, CurrentProc);
595 break; // run the thread anyway
598 new_event(proc, proc, CurrentTime[proc],
600 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
602 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
603 break; // now actually run the thread; DaH Qu'vam yImuHbej
606 do_the_fetchnode(event);
607 goto next_thread; /* handle next event in event queue */
610 do_the_globalblock(event);
611 goto next_thread; /* handle next event in event queue */
614 do_the_fetchreply(event);
615 goto next_thread; /* handle next event in event queue */
617 case UnblockThread: /* Move from the blocked queue to the tail of */
618 do_the_unblock(event);
619 goto next_thread; /* handle next event in event queue */
621 case ResumeThread: /* Move from the blocked queue to the tail of */
622 /* the runnable queue ( i.e. Qu' SImqa'lu') */
623 event->tso->gran.blocktime +=
624 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
625 do_the_startthread(event);
626 goto next_thread; /* handle next event in event queue */
629 do_the_startthread(event);
630 goto next_thread; /* handle next event in event queue */
633 do_the_movethread(event);
634 goto next_thread; /* handle next event in event queue */
637 do_the_movespark(event);
638 goto next_thread; /* handle next event in event queue */
641 do_the_findwork(event);
642 goto next_thread; /* handle next event in event queue */
645 barf("Illegal event type %u\n", event->evttype);
648 /* This point was scheduler_loop in the old RTS */
650 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
652 TimeOfLastEvent = CurrentTime[CurrentProc];
653 TimeOfNextEvent = get_time_of_next_event();
654 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
655 // CurrentTSO = ThreadQueueHd;
657 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
660 if (RtsFlags.GranFlags.Light)
661 GranSimLight_leave_system(event, &ActiveTSO);
663 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
666 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
668 /* in a GranSim setup the TSO stays on the run queue */
670 /* Take a thread from the run queue. */
671 POP_RUN_QUEUE(t); // take_off_run_queue(t);
674 debugBelch("GRAN: About to run current thread, which is\n");
677 context_switch = 0; // turned on via GranYield, checking events and time slice
680 DumpGranEvent(GR_SCHEDULE, t));
682 procStatus[CurrentProc] = Busy;
685 if (PendingFetches != END_BF_QUEUE) {
689 /* ToDo: phps merge with spark activation above */
690 /* check whether we have local work and send requests if we have none */
691 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
692 /* :-[ no local threads => look out for local sparks */
693 /* the spark pool for the current PE */
694 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
695 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
696 pool->hd < pool->tl) {
698 * ToDo: add GC code check that we really have enough heap afterwards!!
700 * If we're here (no runnable threads) and we have pending
701 * sparks, we must have a space problem. Get enough space
702 * to turn one of those pending sparks into a
706 spark = findSpark(rtsFalse); /* get a spark */
707 if (spark != (rtsSpark) NULL) {
708 tso = activateSpark(spark); /* turn the spark into a thread */
709 IF_PAR_DEBUG(schedule,
710 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
711 tso->id, tso, advisory_thread_count));
713 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
714 debugBelch("==^^ failed to activate spark\n");
716 } /* otherwise fall through & pick-up new tso */
718 IF_PAR_DEBUG(verbose,
719 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
720 spark_queue_len(pool)));
725 /* If we still have no work we need to send a FISH to get a spark
728 if (EMPTY_RUN_QUEUE()) {
729 /* =8-[ no local sparks => look for work on other PEs */
731 * We really have absolutely no work. Send out a fish
732 * (there may be some out there already), and wait for
733 * something to arrive. We clearly can't run any threads
734 * until a SCHEDULE or RESUME arrives, and so that's what
735 * we're hoping to see. (Of course, we still have to
736 * respond to other types of messages.)
738 TIME now = msTime() /*CURRENT_TIME*/;
739 IF_PAR_DEBUG(verbose,
740 debugBelch("-- now=%ld\n", now));
741 IF_PAR_DEBUG(verbose,
742 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
743 (last_fish_arrived_at!=0 &&
744 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
745 debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
746 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
747 last_fish_arrived_at,
748 RtsFlags.ParFlags.fishDelay, now);
751 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
752 (last_fish_arrived_at==0 ||
753 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
754 /* outstandingFishes is set in sendFish, processFish;
755 avoid flooding system with fishes via delay */
757 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
760 // Global statistics: count no. of fishes
761 if (RtsFlags.ParFlags.ParStats.Global &&
762 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
763 globalParStats.tot_fish_mess++;
767 receivedFinish = processMessages();
770 } else if (PacketsWaiting()) { /* Look for incoming messages */
771 receivedFinish = processMessages();
774 /* Now we are sure that we have some work available */
775 ASSERT(run_queue_hd != END_TSO_QUEUE);
777 /* Take a thread from the run queue, if we have work */
778 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
779 IF_DEBUG(sanity,checkTSO(t));
781 /* ToDo: write something to the log-file
782 if (RTSflags.ParFlags.granSimStats && !sameThread)
783 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
787 /* the spark pool for the current PE */
788 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
791 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
792 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
795 if (0 && RtsFlags.ParFlags.ParStats.Full &&
796 t && LastTSO && t->id != LastTSO->id &&
797 LastTSO->why_blocked == NotBlocked &&
798 LastTSO->what_next != ThreadComplete) {
799 // if previously scheduled TSO not blocked we have to record the context switch
800 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
801 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
804 if (RtsFlags.ParFlags.ParStats.Full &&
805 (emitSchedule /* forced emit */ ||
806 (t && LastTSO && t->id != LastTSO->id))) {
808 we are running a different TSO, so write a schedule event to log file
809 NB: If we use fair scheduling we also have to write a deschedule
810 event for LastTSO; with unfair scheduling we know that the
811 previous tso has blocked whenever we switch to another tso, so
812 we don't need it in GUM for now
814 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
815 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
816 emitSchedule = rtsFalse;
820 #else /* !GRAN && !PAR */
822 // grab a thread from the run queue
823 ASSERT(run_queue_hd != END_TSO_QUEUE);
826 // Sanity check the thread we're about to run. This can be
827 // expensive if there is lots of thread switching going on...
828 IF_DEBUG(sanity,checkTSO(t));
833 StgMainThread *m = t->main;
840 sched_belch("### Running thread %d in bound thread", t->id));
841 // yes, the Haskell thread is bound to the current native thread
846 sched_belch("### thread %d bound to another OS thread", t->id));
847 // no, bound to a different Haskell thread: pass to that thread
848 PUSH_ON_RUN_QUEUE(t);
849 passCapability(&m->bound_thread_cond);
855 if(mainThread != NULL)
856 // The thread we want to run is bound.
859 sched_belch("### this OS thread cannot run thread %d", t->id));
860 // no, the current native thread is bound to a different
861 // Haskell thread, so pass it to any worker thread
862 PUSH_ON_RUN_QUEUE(t);
863 passCapabilityToWorker();
870 cap->r.rCurrentTSO = t;
872 /* context switches are now initiated by the timer signal, unless
873 * the user specified "context switch as often as possible", with
876 if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
877 && (run_queue_hd != END_TSO_QUEUE
878 || blocked_queue_hd != END_TSO_QUEUE
879 || sleeping_queue != END_TSO_QUEUE)))
884 RELEASE_LOCK(&sched_mutex);
886 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
887 (long)t->id, whatNext_strs[t->what_next]));
890 startHeapProfTimer();
893 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
894 /* Run the current thread
896 prev_what_next = t->what_next;
898 errno = t->saved_errno;
900 switch (prev_what_next) {
904 /* Thread already finished, return to scheduler. */
905 ret = ThreadFinished;
909 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
912 case ThreadInterpret:
913 ret = interpretBCO(cap);
917 barf("schedule: invalid what_next field");
920 // The TSO might have moved, so find the new location:
921 t = cap->r.rCurrentTSO;
923 // And save the current errno in this thread.
924 t->saved_errno = errno;
926 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
928 /* Costs for the scheduler are assigned to CCS_SYSTEM */
934 ACQUIRE_LOCK(&sched_mutex);
936 #ifdef RTS_SUPPORTS_THREADS
937 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
938 #elif !defined(GRAN) && !defined(PAR)
939 IF_DEBUG(scheduler,debugBelch("sched: "););
943 /* HACK 675: if the last thread didn't yield, make sure to print a
944 SCHEDULE event to the log file when StgRunning the next thread, even
945 if it is the same one as before */
947 TimeOfLastYield = CURRENT_TIME;
953 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
954 globalGranStats.tot_heapover++;
956 globalParStats.tot_heapover++;
959 // did the task ask for a large block?
960 if (cap->r.rHpAlloc > BLOCK_SIZE) {
961 // if so, get one and push it on the front of the nursery.
965 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
967 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n",
968 (long)t->id, whatNext_strs[t->what_next], blocks));
970 // don't do this if it would push us over the
971 // alloc_blocks_lim limit; we'll GC first.
972 if (alloc_blocks + blocks < alloc_blocks_lim) {
974 alloc_blocks += blocks;
975 bd = allocGroup( blocks );
977 // link the new group into the list
978 bd->link = cap->r.rCurrentNursery;
979 bd->u.back = cap->r.rCurrentNursery->u.back;
980 if (cap->r.rCurrentNursery->u.back != NULL) {
981 cap->r.rCurrentNursery->u.back->link = bd;
983 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
984 g0s0->blocks == cap->r.rNursery);
985 cap->r.rNursery = g0s0->blocks = bd;
987 cap->r.rCurrentNursery->u.back = bd;
989 // initialise it as a nursery block. We initialise the
990 // step, gen_no, and flags field of *every* sub-block in
991 // this large block, because this is easier than making
992 // sure that we always find the block head of a large
993 // block whenever we call Bdescr() (eg. evacuate() and
994 // isAlive() in the GC would both have to do this, at
998 for (x = bd; x < bd + blocks; x++) {
1005 // don't forget to update the block count in g0s0.
1006 g0s0->n_blocks += blocks;
1007 // This assert can be a killer if the app is doing lots
1008 // of large block allocations.
1009 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1011 // now update the nursery to point to the new block
1012 cap->r.rCurrentNursery = bd;
1014 // we might be unlucky and have another thread get on the
1015 // run queue before us and steal the large block, but in that
1016 // case the thread will just end up requesting another large
1018 PUSH_ON_RUN_QUEUE(t);
1023 /* make all the running tasks block on a condition variable,
1024 * maybe set context_switch and wait till they all pile in,
1025 * then have them wait on a GC condition variable.
1027 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1028 (long)t->id, whatNext_strs[t->what_next]));
1031 ASSERT(!is_on_queue(t,CurrentProc));
1033 /* Currently we emit a DESCHEDULE event before GC in GUM.
1034 ToDo: either add separate event to distinguish SYSTEM time from rest
1035 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1036 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1037 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1038 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1039 emitSchedule = rtsTrue;
1043 ready_to_gc = rtsTrue;
1044 context_switch = 1; /* stop other threads ASAP */
1045 PUSH_ON_RUN_QUEUE(t);
1046 /* actual GC is done at the end of the while loop */
1052 DumpGranEvent(GR_DESCHEDULE, t));
1053 globalGranStats.tot_stackover++;
1056 // DumpGranEvent(GR_DESCHEDULE, t);
1057 globalParStats.tot_stackover++;
1059 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1060 (long)t->id, whatNext_strs[t->what_next]));
1061 /* just adjust the stack for this thread, then pop it back
1066 /* enlarge the stack */
1067 StgTSO *new_t = threadStackOverflow(t);
1069 /* This TSO has moved, so update any pointers to it from the
1070 * main thread stack. It better not be on any other queues...
1071 * (it shouldn't be).
1073 if (t->main != NULL) {
1074 t->main->tso = new_t;
1076 PUSH_ON_RUN_QUEUE(new_t);
1080 case ThreadYielding:
1081 // Reset the context switch flag. We don't do this just before
1082 // running the thread, because that would mean we would lose ticks
1083 // during GC, which can lead to unfair scheduling (a thread hogs
1084 // the CPU because the tick always arrives during GC). This way
1085 // penalises threads that do a lot of allocation, but that seems
1086 // better than the alternative.
1091 DumpGranEvent(GR_DESCHEDULE, t));
1092 globalGranStats.tot_yields++;
1095 // DumpGranEvent(GR_DESCHEDULE, t);
1096 globalParStats.tot_yields++;
1098 /* put the thread back on the run queue. Then, if we're ready to
1099 * GC, check whether this is the last task to stop. If so, wake
1100 * up the GC thread. getThread will block during a GC until the
1104 if (t->what_next != prev_what_next) {
1105 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1106 (long)t->id, whatNext_strs[t->what_next]);
1108 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1109 (long)t->id, whatNext_strs[t->what_next]);
1114 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1116 ASSERT(t->link == END_TSO_QUEUE);
1118 // Shortcut if we're just switching evaluators: don't bother
1119 // doing stack squeezing (which can be expensive), just run the
1121 if (t->what_next != prev_what_next) {
1128 ASSERT(!is_on_queue(t,CurrentProc));
1131 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1132 checkThreadQsSanity(rtsTrue));
1136 if (RtsFlags.ParFlags.doFairScheduling) {
1137 /* this does round-robin scheduling; good for concurrency */
1138 APPEND_TO_RUN_QUEUE(t);
1140 /* this does unfair scheduling; good for parallelism */
1141 PUSH_ON_RUN_QUEUE(t);
1144 // this does round-robin scheduling; good for concurrency
1145 APPEND_TO_RUN_QUEUE(t);
1149 /* add a ContinueThread event to actually process the thread */
1150 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1152 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1154 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1163 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1164 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1165 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1167 // ??? needed; should emit block before
1169 DumpGranEvent(GR_DESCHEDULE, t));
1170 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1173 ASSERT(procStatus[CurrentProc]==Busy ||
1174 ((procStatus[CurrentProc]==Fetching) &&
1175 (t->block_info.closure!=(StgClosure*)NULL)));
1176 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1177 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1178 procStatus[CurrentProc]==Fetching))
1179 procStatus[CurrentProc] = Idle;
1183 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1184 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1187 if (t->block_info.closure!=(StgClosure*)NULL)
1188 print_bq(t->block_info.closure));
1190 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1193 /* whatever we schedule next, we must log that schedule */
1194 emitSchedule = rtsTrue;
1197 /* don't need to do anything. Either the thread is blocked on
1198 * I/O, in which case we'll have called addToBlockedQueue
1199 * previously, or it's blocked on an MVar or Blackhole, in which
1200 * case it'll be on the relevant queue already.
1203 debugBelch("--<< thread %d (%s) stopped: ",
1204 t->id, whatNext_strs[t->what_next]);
1205 printThreadBlockage(t);
1208 /* Only for dumping event to log file
1209 ToDo: do I need this in GranSim, too?
1216 case ThreadFinished:
1217 /* Need to check whether this was a main thread, and if so, signal
1218 * the task that started it with the return value. If we have no
1219 * more main threads, we probably need to stop all the tasks until
1222 /* We also end up here if the thread kills itself with an
1223 * uncaught exception, see Exception.hc.
1225 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1226 t->id, whatNext_strs[t->what_next]));
1228 endThread(t, CurrentProc); // clean-up the thread
1230 /* For now all are advisory -- HWL */
1231 //if(t->priority==AdvisoryPriority) ??
1232 advisory_thread_count--;
1235 if(t->dist.priority==RevalPriority)
1239 if (RtsFlags.ParFlags.ParStats.Full &&
1240 !RtsFlags.ParFlags.ParStats.Suppressed)
1241 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1245 // Check whether the thread that just completed was a main
1246 // thread, and if so return with the result.
1248 // There is an assumption here that all thread completion goes
1249 // through this point; we need to make sure that if a thread
1250 // ends up in the ThreadKilled state, that it stays on the run
1251 // queue so it can be dealt with here.
1254 #if defined(RTS_SUPPORTS_THREADS)
1257 mainThread->tso == t
1261 // We are a bound thread: this must be our thread that just
1263 ASSERT(mainThread->tso == t);
1265 if (t->what_next == ThreadComplete) {
1266 if (mainThread->ret) {
1267 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1268 *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1];
1270 mainThread->stat = Success;
1272 if (mainThread->ret) {
1273 *(mainThread->ret) = NULL;
1275 if (was_interrupted) {
1276 mainThread->stat = Interrupted;
1278 mainThread->stat = Killed;
1282 removeThreadLabel((StgWord)mainThread->tso->id);
1284 if (mainThread->prev == NULL) {
1285 main_threads = mainThread->link;
1287 mainThread->prev->link = mainThread->link;
1289 if (mainThread->link != NULL) {
1290 mainThread->link->prev = NULL;
1292 releaseCapability(cap);
1296 #ifdef RTS_SUPPORTS_THREADS
1297 ASSERT(t->main == NULL);
1299 if (t->main != NULL) {
1300 // Must be a main thread that is not the topmost one. Leave
1301 // it on the run queue until the stack has unwound to the
1302 // point where we can deal with this. Leaving it on the run
1303 // queue also ensures that the garbage collector knows about
1304 // this thread and its return value (it gets dropped from the
1305 // all_threads list so there's no other way to find it).
1306 APPEND_TO_RUN_QUEUE(t);
1312 barf("schedule: invalid thread return code %d", (int)ret);
1316 // When we have +RTS -i0 and we're heap profiling, do a census at
1317 // every GC. This lets us get repeatable runs for debugging.
1318 if (performHeapProfile ||
1319 (RtsFlags.ProfFlags.profileInterval==0 &&
1320 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1321 GarbageCollect(GetRoots, rtsTrue);
1323 performHeapProfile = rtsFalse;
1324 ready_to_gc = rtsFalse; // we already GC'd
1329 /* everybody back, start the GC.
1330 * Could do it in this thread, or signal a condition var
1331 * to do it in another thread. Either way, we need to
1332 * broadcast on gc_pending_cond afterward.
1334 #if defined(RTS_SUPPORTS_THREADS)
1335 IF_DEBUG(scheduler,sched_belch("doing GC"));
1337 GarbageCollect(GetRoots,rtsFalse);
1338 ready_to_gc = rtsFalse;
1340 /* add a ContinueThread event to continue execution of current thread */
1341 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1343 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1345 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1353 IF_GRAN_DEBUG(unused,
1354 print_eventq(EventHd));
1356 event = get_next_event();
1359 /* ToDo: wait for next message to arrive rather than busy wait */
1362 } /* end of while(1) */
1364 IF_PAR_DEBUG(verbose,
1365 debugBelch("== Leaving schedule() after having received Finish\n"));
1368 /* ---------------------------------------------------------------------------
1369 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1370 * used by Control.Concurrent for error checking.
1371 * ------------------------------------------------------------------------- */
1374 rtsSupportsBoundThreads(void)
1383 /* ---------------------------------------------------------------------------
1384 * isThreadBound(tso): check whether tso is bound to an OS thread.
1385 * ------------------------------------------------------------------------- */
1388 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1391 return (tso->main != NULL);
1396 /* ---------------------------------------------------------------------------
1397 * Singleton fork(). Do not copy any running threads.
1398 * ------------------------------------------------------------------------- */
1400 #ifndef mingw32_TARGET_OS
1401 #define FORKPROCESS_PRIMOP_SUPPORTED
1404 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1406 deleteThreadImmediately(StgTSO *tso);
1409 forkProcess(HsStablePtr *entry
1410 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1415 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1421 IF_DEBUG(scheduler,sched_belch("forking!"));
1422 rts_lock(); // This not only acquires sched_mutex, it also
1423 // makes sure that no other threads are running
1427 if (pid) { /* parent */
1429 /* just return the pid */
1433 } else { /* child */
1436 // delete all threads
1437 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1439 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1442 // don't allow threads to catch the ThreadKilled exception
1443 deleteThreadImmediately(t);
1446 // wipe the main thread list
1447 while((m = main_threads) != NULL) {
1448 main_threads = m->link;
1449 # ifdef THREADED_RTS
1450 closeCondition(&m->bound_thread_cond);
1455 rc = rts_evalStableIO(entry, NULL); // run the action
1456 rts_checkSchedStatus("forkProcess",rc);
1460 hs_exit(); // clean up and exit
1463 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1464 barf("forkProcess#: primop not supported, sorry!\n");
1469 /* ---------------------------------------------------------------------------
1470 * deleteAllThreads(): kill all the live threads.
1472 * This is used when we catch a user interrupt (^C), before performing
1473 * any necessary cleanups and running finalizers.
1475 * Locks: sched_mutex held.
1476 * ------------------------------------------------------------------------- */
1479 deleteAllThreads ( void )
1482 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1483 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1484 next = t->global_link;
1488 // The run queue now contains a bunch of ThreadKilled threads. We
1489 // must not throw these away: the main thread(s) will be in there
1490 // somewhere, and the main scheduler loop has to deal with it.
1491 // Also, the run queue is the only thing keeping these threads from
1492 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1494 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1495 ASSERT(sleeping_queue == END_TSO_QUEUE);
1498 /* startThread and insertThread are now in GranSim.c -- HWL */
1501 /* ---------------------------------------------------------------------------
1502 * Suspending & resuming Haskell threads.
1504 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1505 * its capability before calling the C function. This allows another
1506 * task to pick up the capability and carry on running Haskell
1507 * threads. It also means that if the C call blocks, it won't lock
1510 * The Haskell thread making the C call is put to sleep for the
1511 * duration of the call, on the susepended_ccalling_threads queue. We
1512 * give out a token to the task, which it can use to resume the thread
1513 * on return from the C function.
1514 * ------------------------------------------------------------------------- */
1517 suspendThread( StgRegTable *reg )
1521 int saved_errno = errno;
1523 /* assume that *reg is a pointer to the StgRegTable part
1526 cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1528 ACQUIRE_LOCK(&sched_mutex);
1531 sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1533 // XXX this might not be necessary --SDM
1534 cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1536 threadPaused(cap->r.rCurrentTSO);
1537 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1538 suspended_ccalling_threads = cap->r.rCurrentTSO;
1540 if(cap->r.rCurrentTSO->blocked_exceptions == NULL) {
1541 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1542 cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1544 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1547 /* Use the thread ID as the token; it should be unique */
1548 tok = cap->r.rCurrentTSO->id;
1550 /* Hand back capability */
1551 releaseCapability(cap);
1553 #if defined(RTS_SUPPORTS_THREADS)
1554 /* Preparing to leave the RTS, so ensure there's a native thread/task
1555 waiting to take over.
1557 IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1560 RELEASE_LOCK(&sched_mutex);
1562 errno = saved_errno;
1567 resumeThread( StgInt tok )
1569 StgTSO *tso, **prev;
1571 int saved_errno = errno;
1573 #if defined(RTS_SUPPORTS_THREADS)
1574 /* Wait for permission to re-enter the RTS with the result. */
1575 ACQUIRE_LOCK(&sched_mutex);
1576 waitForReturnCapability(&sched_mutex, &cap);
1578 IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1580 grabCapability(&cap);
1583 /* Remove the thread off of the suspended list */
1584 prev = &suspended_ccalling_threads;
1585 for (tso = suspended_ccalling_threads;
1586 tso != END_TSO_QUEUE;
1587 prev = &tso->link, tso = tso->link) {
1588 if (tso->id == (StgThreadID)tok) {
1593 if (tso == END_TSO_QUEUE) {
1594 barf("resumeThread: thread not found");
1596 tso->link = END_TSO_QUEUE;
1598 if(tso->why_blocked == BlockedOnCCall) {
1599 awakenBlockedQueueNoLock(tso->blocked_exceptions);
1600 tso->blocked_exceptions = NULL;
1603 /* Reset blocking status */
1604 tso->why_blocked = NotBlocked;
1606 cap->r.rCurrentTSO = tso;
1607 RELEASE_LOCK(&sched_mutex);
1608 errno = saved_errno;
1613 /* ---------------------------------------------------------------------------
1615 * ------------------------------------------------------------------------ */
1616 static void unblockThread(StgTSO *tso);
1618 /* ---------------------------------------------------------------------------
1619 * Comparing Thread ids.
1621 * This is used from STG land in the implementation of the
1622 * instances of Eq/Ord for ThreadIds.
1623 * ------------------------------------------------------------------------ */
1626 cmp_thread(StgPtr tso1, StgPtr tso2)
1628 StgThreadID id1 = ((StgTSO *)tso1)->id;
1629 StgThreadID id2 = ((StgTSO *)tso2)->id;
1631 if (id1 < id2) return (-1);
1632 if (id1 > id2) return 1;
1636 /* ---------------------------------------------------------------------------
1637 * Fetching the ThreadID from an StgTSO.
1639 * This is used in the implementation of Show for ThreadIds.
1640 * ------------------------------------------------------------------------ */
1642 rts_getThreadId(StgPtr tso)
1644 return ((StgTSO *)tso)->id;
1649 labelThread(StgPtr tso, char *label)
1654 /* Caveat: Once set, you can only set the thread name to "" */
1655 len = strlen(label)+1;
1656 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1657 strncpy(buf,label,len);
1658 /* Update will free the old memory for us */
1659 updateThreadLabel(((StgTSO *)tso)->id,buf);
1663 /* ---------------------------------------------------------------------------
1664 Create a new thread.
1666 The new thread starts with the given stack size. Before the
1667 scheduler can run, however, this thread needs to have a closure
1668 (and possibly some arguments) pushed on its stack. See
1669 pushClosure() in Schedule.h.
1671 createGenThread() and createIOThread() (in SchedAPI.h) are
1672 convenient packaged versions of this function.
1674 currently pri (priority) is only used in a GRAN setup -- HWL
1675 ------------------------------------------------------------------------ */
1677 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1679 createThread(nat size, StgInt pri)
1682 createThread(nat size)
1689 /* First check whether we should create a thread at all */
1691 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1692 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1694 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1695 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1696 return END_TSO_QUEUE;
1702 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1705 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1707 /* catch ridiculously small stack sizes */
1708 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1709 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1712 stack_size = size - TSO_STRUCT_SIZEW;
1714 tso = (StgTSO *)allocate(size);
1715 TICK_ALLOC_TSO(stack_size, 0);
1717 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1719 SET_GRAN_HDR(tso, ThisPE);
1722 // Always start with the compiled code evaluator
1723 tso->what_next = ThreadRunGHC;
1725 tso->id = next_thread_id++;
1726 tso->why_blocked = NotBlocked;
1727 tso->blocked_exceptions = NULL;
1729 tso->saved_errno = 0;
1732 tso->stack_size = stack_size;
1733 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1735 tso->sp = (P_)&(tso->stack) + stack_size;
1738 tso->prof.CCCS = CCS_MAIN;
1741 /* put a stop frame on the stack */
1742 tso->sp -= sizeofW(StgStopFrame);
1743 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1744 tso->link = END_TSO_QUEUE;
1748 /* uses more flexible routine in GranSim */
1749 insertThread(tso, CurrentProc);
1751 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1757 if (RtsFlags.GranFlags.GranSimStats.Full)
1758 DumpGranEvent(GR_START,tso);
1760 if (RtsFlags.ParFlags.ParStats.Full)
1761 DumpGranEvent(GR_STARTQ,tso);
1762 /* HACk to avoid SCHEDULE
1766 /* Link the new thread on the global thread list.
1768 tso->global_link = all_threads;
1772 tso->dist.priority = MandatoryPriority; //by default that is...
1776 tso->gran.pri = pri;
1778 tso->gran.magic = TSO_MAGIC; // debugging only
1780 tso->gran.sparkname = 0;
1781 tso->gran.startedat = CURRENT_TIME;
1782 tso->gran.exported = 0;
1783 tso->gran.basicblocks = 0;
1784 tso->gran.allocs = 0;
1785 tso->gran.exectime = 0;
1786 tso->gran.fetchtime = 0;
1787 tso->gran.fetchcount = 0;
1788 tso->gran.blocktime = 0;
1789 tso->gran.blockcount = 0;
1790 tso->gran.blockedat = 0;
1791 tso->gran.globalsparks = 0;
1792 tso->gran.localsparks = 0;
1793 if (RtsFlags.GranFlags.Light)
1794 tso->gran.clock = Now; /* local clock */
1796 tso->gran.clock = 0;
1798 IF_DEBUG(gran,printTSO(tso));
1801 tso->par.magic = TSO_MAGIC; // debugging only
1803 tso->par.sparkname = 0;
1804 tso->par.startedat = CURRENT_TIME;
1805 tso->par.exported = 0;
1806 tso->par.basicblocks = 0;
1807 tso->par.allocs = 0;
1808 tso->par.exectime = 0;
1809 tso->par.fetchtime = 0;
1810 tso->par.fetchcount = 0;
1811 tso->par.blocktime = 0;
1812 tso->par.blockcount = 0;
1813 tso->par.blockedat = 0;
1814 tso->par.globalsparks = 0;
1815 tso->par.localsparks = 0;
1819 globalGranStats.tot_threads_created++;
1820 globalGranStats.threads_created_on_PE[CurrentProc]++;
1821 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1822 globalGranStats.tot_sq_probes++;
1824 // collect parallel global statistics (currently done together with GC stats)
1825 if (RtsFlags.ParFlags.ParStats.Global &&
1826 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1827 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
1828 globalParStats.tot_threads_created++;
1834 sched_belch("==__ schedule: Created TSO %d (%p);",
1835 CurrentProc, tso, tso->id));
1837 IF_PAR_DEBUG(verbose,
1838 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1839 (long)tso->id, tso, advisory_thread_count));
1841 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1842 (long)tso->id, (long)tso->stack_size));
1849 all parallel thread creation calls should fall through the following routine.
1852 createSparkThread(rtsSpark spark)
1854 ASSERT(spark != (rtsSpark)NULL);
1855 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1857 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1858 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1859 return END_TSO_QUEUE;
1863 tso = createThread(RtsFlags.GcFlags.initialStkSize);
1864 if (tso==END_TSO_QUEUE)
1865 barf("createSparkThread: Cannot create TSO");
1867 tso->priority = AdvisoryPriority;
1869 pushClosure(tso,spark);
1870 PUSH_ON_RUN_QUEUE(tso);
1871 advisory_thread_count++;
1878 Turn a spark into a thread.
1879 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1883 activateSpark (rtsSpark spark)
1887 tso = createSparkThread(spark);
1888 if (RtsFlags.ParFlags.ParStats.Full) {
1889 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1890 IF_PAR_DEBUG(verbose,
1891 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1892 (StgClosure *)spark, info_type((StgClosure *)spark)));
1894 // ToDo: fwd info on local/global spark to thread -- HWL
1895 // tso->gran.exported = spark->exported;
1896 // tso->gran.locked = !spark->global;
1897 // tso->gran.sparkname = spark->name;
1903 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1904 Capability *initialCapability
1908 /* ---------------------------------------------------------------------------
1911 * scheduleThread puts a thread on the head of the runnable queue.
1912 * This will usually be done immediately after a thread is created.
1913 * The caller of scheduleThread must create the thread using e.g.
1914 * createThread and push an appropriate closure
1915 * on this thread's stack before the scheduler is invoked.
1916 * ------------------------------------------------------------------------ */
1918 static void scheduleThread_ (StgTSO* tso);
1921 scheduleThread_(StgTSO *tso)
1923 // The thread goes at the *end* of the run-queue, to avoid possible
1924 // starvation of any threads already on the queue.
1925 APPEND_TO_RUN_QUEUE(tso);
1930 scheduleThread(StgTSO* tso)
1932 ACQUIRE_LOCK(&sched_mutex);
1933 scheduleThread_(tso);
1934 RELEASE_LOCK(&sched_mutex);
1937 #if defined(RTS_SUPPORTS_THREADS)
1938 static Condition bound_cond_cache;
1939 static int bound_cond_cache_full = 0;
1944 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1945 Capability *initialCapability)
1947 // Precondition: sched_mutex must be held
1950 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1955 m->link = main_threads;
1957 if (main_threads != NULL) {
1958 main_threads->prev = m;
1962 #if defined(RTS_SUPPORTS_THREADS)
1963 // Allocating a new condition for each thread is expensive, so we
1964 // cache one. This is a pretty feeble hack, but it helps speed up
1965 // consecutive call-ins quite a bit.
1966 if (bound_cond_cache_full) {
1967 m->bound_thread_cond = bound_cond_cache;
1968 bound_cond_cache_full = 0;
1970 initCondition(&m->bound_thread_cond);
1974 /* Put the thread on the main-threads list prior to scheduling the TSO.
1975 Failure to do so introduces a race condition in the MT case (as
1976 identified by Wolfgang Thaller), whereby the new task/OS thread
1977 created by scheduleThread_() would complete prior to the thread
1978 that spawned it managed to put 'itself' on the main-threads list.
1979 The upshot of it all being that the worker thread wouldn't get to
1980 signal the completion of the its work item for the main thread to
1981 see (==> it got stuck waiting.) -- sof 6/02.
1983 IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
1985 APPEND_TO_RUN_QUEUE(tso);
1986 // NB. Don't call threadRunnable() here, because the thread is
1987 // bound and only runnable by *this* OS thread, so waking up other
1988 // workers will just slow things down.
1990 return waitThread_(m, initialCapability);
1993 /* ---------------------------------------------------------------------------
1996 * Initialise the scheduler. This resets all the queues - if the
1997 * queues contained any threads, they'll be garbage collected at the
2000 * ------------------------------------------------------------------------ */
2008 for (i=0; i<=MAX_PROC; i++) {
2009 run_queue_hds[i] = END_TSO_QUEUE;
2010 run_queue_tls[i] = END_TSO_QUEUE;
2011 blocked_queue_hds[i] = END_TSO_QUEUE;
2012 blocked_queue_tls[i] = END_TSO_QUEUE;
2013 ccalling_threadss[i] = END_TSO_QUEUE;
2014 sleeping_queue = END_TSO_QUEUE;
2017 run_queue_hd = END_TSO_QUEUE;
2018 run_queue_tl = END_TSO_QUEUE;
2019 blocked_queue_hd = END_TSO_QUEUE;
2020 blocked_queue_tl = END_TSO_QUEUE;
2021 sleeping_queue = END_TSO_QUEUE;
2024 suspended_ccalling_threads = END_TSO_QUEUE;
2026 main_threads = NULL;
2027 all_threads = END_TSO_QUEUE;
2032 RtsFlags.ConcFlags.ctxtSwitchTicks =
2033 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2035 #if defined(RTS_SUPPORTS_THREADS)
2036 /* Initialise the mutex and condition variables used by
2038 initMutex(&sched_mutex);
2039 initMutex(&term_mutex);
2042 ACQUIRE_LOCK(&sched_mutex);
2044 /* A capability holds the state a native thread needs in
2045 * order to execute STG code. At least one capability is
2046 * floating around (only SMP builds have more than one).
2050 #if defined(RTS_SUPPORTS_THREADS)
2051 /* start our haskell execution tasks */
2052 startTaskManager(0,taskStart);
2055 #if /* defined(SMP) ||*/ defined(PAR)
2059 RELEASE_LOCK(&sched_mutex);
2063 exitScheduler( void )
2065 #if defined(RTS_SUPPORTS_THREADS)
2068 shutting_down_scheduler = rtsTrue;
2071 /* ----------------------------------------------------------------------------
2072 Managing the per-task allocation areas.
2074 Each capability comes with an allocation area. These are
2075 fixed-length block lists into which allocation can be done.
2077 ToDo: no support for two-space collection at the moment???
2078 ------------------------------------------------------------------------- */
2082 waitThread_(StgMainThread* m, Capability *initialCapability)
2084 SchedulerStatus stat;
2086 // Precondition: sched_mutex must be held.
2087 IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2090 /* GranSim specific init */
2091 CurrentTSO = m->tso; // the TSO to run
2092 procStatus[MainProc] = Busy; // status of main PE
2093 CurrentProc = MainProc; // PE to run it on
2094 schedule(m,initialCapability);
2096 schedule(m,initialCapability);
2097 ASSERT(m->stat != NoStatus);
2102 #if defined(RTS_SUPPORTS_THREADS)
2103 // Free the condition variable, returning it to the cache if possible.
2104 if (!bound_cond_cache_full) {
2105 bound_cond_cache = m->bound_thread_cond;
2106 bound_cond_cache_full = 1;
2108 closeCondition(&m->bound_thread_cond);
2112 IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2115 // Postcondition: sched_mutex still held
2119 /* ---------------------------------------------------------------------------
2120 Where are the roots that we know about?
2122 - all the threads on the runnable queue
2123 - all the threads on the blocked queue
2124 - all the threads on the sleeping queue
2125 - all the thread currently executing a _ccall_GC
2126 - all the "main threads"
2128 ------------------------------------------------------------------------ */
2130 /* This has to be protected either by the scheduler monitor, or by the
2131 garbage collection monitor (probably the latter).
2136 GetRoots( evac_fn evac )
2141 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2142 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2143 evac((StgClosure **)&run_queue_hds[i]);
2144 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2145 evac((StgClosure **)&run_queue_tls[i]);
2147 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2148 evac((StgClosure **)&blocked_queue_hds[i]);
2149 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2150 evac((StgClosure **)&blocked_queue_tls[i]);
2151 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2152 evac((StgClosure **)&ccalling_threads[i]);
2159 if (run_queue_hd != END_TSO_QUEUE) {
2160 ASSERT(run_queue_tl != END_TSO_QUEUE);
2161 evac((StgClosure **)&run_queue_hd);
2162 evac((StgClosure **)&run_queue_tl);
2165 if (blocked_queue_hd != END_TSO_QUEUE) {
2166 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2167 evac((StgClosure **)&blocked_queue_hd);
2168 evac((StgClosure **)&blocked_queue_tl);
2171 if (sleeping_queue != END_TSO_QUEUE) {
2172 evac((StgClosure **)&sleeping_queue);
2176 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2177 evac((StgClosure **)&suspended_ccalling_threads);
2180 #if defined(PAR) || defined(GRAN)
2181 markSparkQueue(evac);
2184 #if defined(RTS_USER_SIGNALS)
2185 // mark the signal handlers (signals should be already blocked)
2186 markSignalHandlers(evac);
2190 /* -----------------------------------------------------------------------------
2193 This is the interface to the garbage collector from Haskell land.
2194 We provide this so that external C code can allocate and garbage
2195 collect when called from Haskell via _ccall_GC.
2197 It might be useful to provide an interface whereby the programmer
2198 can specify more roots (ToDo).
2200 This needs to be protected by the GC condition variable above. KH.
2201 -------------------------------------------------------------------------- */
2203 static void (*extra_roots)(evac_fn);
2208 /* Obligated to hold this lock upon entry */
2209 ACQUIRE_LOCK(&sched_mutex);
2210 GarbageCollect(GetRoots,rtsFalse);
2211 RELEASE_LOCK(&sched_mutex);
2215 performMajorGC(void)
2217 ACQUIRE_LOCK(&sched_mutex);
2218 GarbageCollect(GetRoots,rtsTrue);
2219 RELEASE_LOCK(&sched_mutex);
2223 AllRoots(evac_fn evac)
2225 GetRoots(evac); // the scheduler's roots
2226 extra_roots(evac); // the user's roots
2230 performGCWithRoots(void (*get_roots)(evac_fn))
2232 ACQUIRE_LOCK(&sched_mutex);
2233 extra_roots = get_roots;
2234 GarbageCollect(AllRoots,rtsFalse);
2235 RELEASE_LOCK(&sched_mutex);
2238 /* -----------------------------------------------------------------------------
2241 If the thread has reached its maximum stack size, then raise the
2242 StackOverflow exception in the offending thread. Otherwise
2243 relocate the TSO into a larger chunk of memory and adjust its stack
2245 -------------------------------------------------------------------------- */
2248 threadStackOverflow(StgTSO *tso)
2250 nat new_stack_size, new_tso_size, stack_words;
2254 IF_DEBUG(sanity,checkTSO(tso));
2255 if (tso->stack_size >= tso->max_stack_size) {
2258 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2259 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2260 /* If we're debugging, just print out the top of the stack */
2261 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2264 /* Send this thread the StackOverflow exception */
2265 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2269 /* Try to double the current stack size. If that takes us over the
2270 * maximum stack size for this thread, then use the maximum instead.
2271 * Finally round up so the TSO ends up as a whole number of blocks.
2273 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2274 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2275 TSO_STRUCT_SIZE)/sizeof(W_);
2276 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2277 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2279 IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2281 dest = (StgTSO *)allocate(new_tso_size);
2282 TICK_ALLOC_TSO(new_stack_size,0);
2284 /* copy the TSO block and the old stack into the new area */
2285 memcpy(dest,tso,TSO_STRUCT_SIZE);
2286 stack_words = tso->stack + tso->stack_size - tso->sp;
2287 new_sp = (P_)dest + new_tso_size - stack_words;
2288 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2290 /* relocate the stack pointers... */
2292 dest->stack_size = new_stack_size;
2294 /* Mark the old TSO as relocated. We have to check for relocated
2295 * TSOs in the garbage collector and any primops that deal with TSOs.
2297 * It's important to set the sp value to just beyond the end
2298 * of the stack, so we don't attempt to scavenge any part of the
2301 tso->what_next = ThreadRelocated;
2303 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2304 tso->why_blocked = NotBlocked;
2305 dest->mut_link = NULL;
2307 IF_PAR_DEBUG(verbose,
2308 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2309 tso->id, tso, tso->stack_size);
2310 /* If we're debugging, just print out the top of the stack */
2311 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2314 IF_DEBUG(sanity,checkTSO(tso));
2316 IF_DEBUG(scheduler,printTSO(dest));
2322 /* ---------------------------------------------------------------------------
2323 Wake up a queue that was blocked on some resource.
2324 ------------------------------------------------------------------------ */
2328 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2333 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2335 /* write RESUME events to log file and
2336 update blocked and fetch time (depending on type of the orig closure) */
2337 if (RtsFlags.ParFlags.ParStats.Full) {
2338 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2339 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2340 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2341 if (EMPTY_RUN_QUEUE())
2342 emitSchedule = rtsTrue;
2344 switch (get_itbl(node)->type) {
2346 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2351 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2358 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2365 static StgBlockingQueueElement *
2366 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2369 PEs node_loc, tso_loc;
2371 node_loc = where_is(node); // should be lifted out of loop
2372 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2373 tso_loc = where_is((StgClosure *)tso);
2374 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2375 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2376 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2377 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2378 // insertThread(tso, node_loc);
2379 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2381 tso, node, (rtsSpark*)NULL);
2382 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2385 } else { // TSO is remote (actually should be FMBQ)
2386 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2387 RtsFlags.GranFlags.Costs.gunblocktime +
2388 RtsFlags.GranFlags.Costs.latency;
2389 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2391 tso, node, (rtsSpark*)NULL);
2392 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2395 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2397 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2398 (node_loc==tso_loc ? "Local" : "Global"),
2399 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2400 tso->block_info.closure = NULL;
2401 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
2405 static StgBlockingQueueElement *
2406 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2408 StgBlockingQueueElement *next;
2410 switch (get_itbl(bqe)->type) {
2412 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2413 /* if it's a TSO just push it onto the run_queue */
2415 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2416 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
2418 unblockCount(bqe, node);
2419 /* reset blocking status after dumping event */
2420 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2424 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2426 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2427 PendingFetches = (StgBlockedFetch *)bqe;
2431 /* can ignore this case in a non-debugging setup;
2432 see comments on RBHSave closures above */
2434 /* check that the closure is an RBHSave closure */
2435 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2436 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2437 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2441 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2442 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2446 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2450 #else /* !GRAN && !PAR */
2452 unblockOneLocked(StgTSO *tso)
2456 ASSERT(get_itbl(tso)->type == TSO);
2457 ASSERT(tso->why_blocked != NotBlocked);
2458 tso->why_blocked = NotBlocked;
2460 tso->link = END_TSO_QUEUE;
2461 APPEND_TO_RUN_QUEUE(tso);
2463 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2468 #if defined(GRAN) || defined(PAR)
2469 INLINE_ME StgBlockingQueueElement *
2470 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2472 ACQUIRE_LOCK(&sched_mutex);
2473 bqe = unblockOneLocked(bqe, node);
2474 RELEASE_LOCK(&sched_mutex);
2479 unblockOne(StgTSO *tso)
2481 ACQUIRE_LOCK(&sched_mutex);
2482 tso = unblockOneLocked(tso);
2483 RELEASE_LOCK(&sched_mutex);
2490 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2492 StgBlockingQueueElement *bqe;
2497 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2498 node, CurrentProc, CurrentTime[CurrentProc],
2499 CurrentTSO->id, CurrentTSO));
2501 node_loc = where_is(node);
2503 ASSERT(q == END_BQ_QUEUE ||
2504 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2505 get_itbl(q)->type == CONSTR); // closure (type constructor)
2506 ASSERT(is_unique(node));
2508 /* FAKE FETCH: magically copy the node to the tso's proc;
2509 no Fetch necessary because in reality the node should not have been
2510 moved to the other PE in the first place
2512 if (CurrentProc!=node_loc) {
2514 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2515 node, node_loc, CurrentProc, CurrentTSO->id,
2516 // CurrentTSO, where_is(CurrentTSO),
2517 node->header.gran.procs));
2518 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2520 debugBelch("## new bitmask of node %p is %#x\n",
2521 node, node->header.gran.procs));
2522 if (RtsFlags.GranFlags.GranSimStats.Global) {
2523 globalGranStats.tot_fake_fetches++;
2528 // ToDo: check: ASSERT(CurrentProc==node_loc);
2529 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2532 bqe points to the current element in the queue
2533 next points to the next element in the queue
2535 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2536 //tso_loc = where_is(tso);
2538 bqe = unblockOneLocked(bqe, node);
2541 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2542 the closure to make room for the anchor of the BQ */
2543 if (bqe!=END_BQ_QUEUE) {
2544 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2546 ASSERT((info_ptr==&RBH_Save_0_info) ||
2547 (info_ptr==&RBH_Save_1_info) ||
2548 (info_ptr==&RBH_Save_2_info));
2550 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2551 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2552 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2555 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2556 node, info_type(node)));
2559 /* statistics gathering */
2560 if (RtsFlags.GranFlags.GranSimStats.Global) {
2561 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2562 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2563 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2564 globalGranStats.tot_awbq++; // total no. of bqs awakened
2567 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2568 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2572 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2574 StgBlockingQueueElement *bqe;
2576 ACQUIRE_LOCK(&sched_mutex);
2578 IF_PAR_DEBUG(verbose,
2579 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2583 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2584 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2589 ASSERT(q == END_BQ_QUEUE ||
2590 get_itbl(q)->type == TSO ||
2591 get_itbl(q)->type == BLOCKED_FETCH ||
2592 get_itbl(q)->type == CONSTR);
2595 while (get_itbl(bqe)->type==TSO ||
2596 get_itbl(bqe)->type==BLOCKED_FETCH) {
2597 bqe = unblockOneLocked(bqe, node);
2599 RELEASE_LOCK(&sched_mutex);
2602 #else /* !GRAN && !PAR */
2605 awakenBlockedQueueNoLock(StgTSO *tso)
2607 while (tso != END_TSO_QUEUE) {
2608 tso = unblockOneLocked(tso);
2613 awakenBlockedQueue(StgTSO *tso)
2615 ACQUIRE_LOCK(&sched_mutex);
2616 while (tso != END_TSO_QUEUE) {
2617 tso = unblockOneLocked(tso);
2619 RELEASE_LOCK(&sched_mutex);
2623 /* ---------------------------------------------------------------------------
2625 - usually called inside a signal handler so it mustn't do anything fancy.
2626 ------------------------------------------------------------------------ */
2629 interruptStgRts(void)
2635 /* -----------------------------------------------------------------------------
2638 This is for use when we raise an exception in another thread, which
2640 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2641 -------------------------------------------------------------------------- */
2643 #if defined(GRAN) || defined(PAR)
2645 NB: only the type of the blocking queue is different in GranSim and GUM
2646 the operations on the queue-elements are the same
2647 long live polymorphism!
2649 Locks: sched_mutex is held upon entry and exit.
2653 unblockThread(StgTSO *tso)
2655 StgBlockingQueueElement *t, **last;
2657 switch (tso->why_blocked) {
2660 return; /* not blocked */
2663 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2665 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2666 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2668 last = (StgBlockingQueueElement **)&mvar->head;
2669 for (t = (StgBlockingQueueElement *)mvar->head;
2671 last = &t->link, last_tso = t, t = t->link) {
2672 if (t == (StgBlockingQueueElement *)tso) {
2673 *last = (StgBlockingQueueElement *)tso->link;
2674 if (mvar->tail == tso) {
2675 mvar->tail = (StgTSO *)last_tso;
2680 barf("unblockThread (MVAR): TSO not found");
2683 case BlockedOnBlackHole:
2684 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2686 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2688 last = &bq->blocking_queue;
2689 for (t = bq->blocking_queue;
2691 last = &t->link, t = t->link) {
2692 if (t == (StgBlockingQueueElement *)tso) {
2693 *last = (StgBlockingQueueElement *)tso->link;
2697 barf("unblockThread (BLACKHOLE): TSO not found");
2700 case BlockedOnException:
2702 StgTSO *target = tso->block_info.tso;
2704 ASSERT(get_itbl(target)->type == TSO);
2706 if (target->what_next == ThreadRelocated) {
2707 target = target->link;
2708 ASSERT(get_itbl(target)->type == TSO);
2711 ASSERT(target->blocked_exceptions != NULL);
2713 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2714 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2716 last = &t->link, t = t->link) {
2717 ASSERT(get_itbl(t)->type == TSO);
2718 if (t == (StgBlockingQueueElement *)tso) {
2719 *last = (StgBlockingQueueElement *)tso->link;
2723 barf("unblockThread (Exception): TSO not found");
2727 case BlockedOnWrite:
2728 #if defined(mingw32_TARGET_OS)
2729 case BlockedOnDoProc:
2732 /* take TSO off blocked_queue */
2733 StgBlockingQueueElement *prev = NULL;
2734 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2735 prev = t, t = t->link) {
2736 if (t == (StgBlockingQueueElement *)tso) {
2738 blocked_queue_hd = (StgTSO *)t->link;
2739 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2740 blocked_queue_tl = END_TSO_QUEUE;
2743 prev->link = t->link;
2744 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2745 blocked_queue_tl = (StgTSO *)prev;
2751 barf("unblockThread (I/O): TSO not found");
2754 case BlockedOnDelay:
2756 /* take TSO off sleeping_queue */
2757 StgBlockingQueueElement *prev = NULL;
2758 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2759 prev = t, t = t->link) {
2760 if (t == (StgBlockingQueueElement *)tso) {
2762 sleeping_queue = (StgTSO *)t->link;
2764 prev->link = t->link;
2769 barf("unblockThread (delay): TSO not found");
2773 barf("unblockThread");
2777 tso->link = END_TSO_QUEUE;
2778 tso->why_blocked = NotBlocked;
2779 tso->block_info.closure = NULL;
2780 PUSH_ON_RUN_QUEUE(tso);
2784 unblockThread(StgTSO *tso)
2788 /* To avoid locking unnecessarily. */
2789 if (tso->why_blocked == NotBlocked) {
2793 switch (tso->why_blocked) {
2796 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2798 StgTSO *last_tso = END_TSO_QUEUE;
2799 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2802 for (t = mvar->head; t != END_TSO_QUEUE;
2803 last = &t->link, last_tso = t, t = t->link) {
2806 if (mvar->tail == tso) {
2807 mvar->tail = last_tso;
2812 barf("unblockThread (MVAR): TSO not found");
2815 case BlockedOnBlackHole:
2816 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2818 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2820 last = &bq->blocking_queue;
2821 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2822 last = &t->link, t = t->link) {
2828 barf("unblockThread (BLACKHOLE): TSO not found");
2831 case BlockedOnException:
2833 StgTSO *target = tso->block_info.tso;
2835 ASSERT(get_itbl(target)->type == TSO);
2837 while (target->what_next == ThreadRelocated) {
2838 target = target->link;
2839 ASSERT(get_itbl(target)->type == TSO);
2842 ASSERT(target->blocked_exceptions != NULL);
2844 last = &target->blocked_exceptions;
2845 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2846 last = &t->link, t = t->link) {
2847 ASSERT(get_itbl(t)->type == TSO);
2853 barf("unblockThread (Exception): TSO not found");
2857 case BlockedOnWrite:
2858 #if defined(mingw32_TARGET_OS)
2859 case BlockedOnDoProc:
2862 StgTSO *prev = NULL;
2863 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2864 prev = t, t = t->link) {
2867 blocked_queue_hd = t->link;
2868 if (blocked_queue_tl == t) {
2869 blocked_queue_tl = END_TSO_QUEUE;
2872 prev->link = t->link;
2873 if (blocked_queue_tl == t) {
2874 blocked_queue_tl = prev;
2880 barf("unblockThread (I/O): TSO not found");
2883 case BlockedOnDelay:
2885 StgTSO *prev = NULL;
2886 for (t = sleeping_queue; t != END_TSO_QUEUE;
2887 prev = t, t = t->link) {
2890 sleeping_queue = t->link;
2892 prev->link = t->link;
2897 barf("unblockThread (delay): TSO not found");
2901 barf("unblockThread");
2905 tso->link = END_TSO_QUEUE;
2906 tso->why_blocked = NotBlocked;
2907 tso->block_info.closure = NULL;
2908 APPEND_TO_RUN_QUEUE(tso);
2912 /* -----------------------------------------------------------------------------
2915 * The following function implements the magic for raising an
2916 * asynchronous exception in an existing thread.
2918 * We first remove the thread from any queue on which it might be
2919 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2921 * We strip the stack down to the innermost CATCH_FRAME, building
2922 * thunks in the heap for all the active computations, so they can
2923 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2924 * an application of the handler to the exception, and push it on
2925 * the top of the stack.
2927 * How exactly do we save all the active computations? We create an
2928 * AP_STACK for every UpdateFrame on the stack. Entering one of these
2929 * AP_STACKs pushes everything from the corresponding update frame
2930 * upwards onto the stack. (Actually, it pushes everything up to the
2931 * next update frame plus a pointer to the next AP_STACK object.
2932 * Entering the next AP_STACK object pushes more onto the stack until we
2933 * reach the last AP_STACK object - at which point the stack should look
2934 * exactly as it did when we killed the TSO and we can continue
2935 * execution by entering the closure on top of the stack.
2937 * We can also kill a thread entirely - this happens if either (a) the
2938 * exception passed to raiseAsync is NULL, or (b) there's no
2939 * CATCH_FRAME on the stack. In either case, we strip the entire
2940 * stack and replace the thread with a zombie.
2942 * Locks: sched_mutex held upon entry nor exit.
2944 * -------------------------------------------------------------------------- */
2947 deleteThread(StgTSO *tso)
2949 raiseAsync(tso,NULL);
2952 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2954 deleteThreadImmediately(StgTSO *tso)
2955 { // for forkProcess only:
2956 // delete thread without giving it a chance to catch the KillThread exception
2958 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2962 if (tso->why_blocked != BlockedOnCCall &&
2963 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2967 tso->what_next = ThreadKilled;
2972 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
2974 /* When raising async exs from contexts where sched_mutex isn't held;
2975 use raiseAsyncWithLock(). */
2976 ACQUIRE_LOCK(&sched_mutex);
2977 raiseAsync(tso,exception);
2978 RELEASE_LOCK(&sched_mutex);
2982 raiseAsync(StgTSO *tso, StgClosure *exception)
2984 StgRetInfoTable *info;
2987 // Thread already dead?
2988 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2993 sched_belch("raising exception in thread %ld.", (long)tso->id));
2995 // Remove it from any blocking queues
3000 // The stack freezing code assumes there's a closure pointer on
3001 // the top of the stack, so we have to arrange that this is the case...
3003 if (sp[0] == (W_)&stg_enter_info) {
3007 sp[0] = (W_)&stg_dummy_ret_closure;
3013 // 1. Let the top of the stack be the "current closure"
3015 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3018 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3019 // current closure applied to the chunk of stack up to (but not
3020 // including) the update frame. This closure becomes the "current
3021 // closure". Go back to step 2.
3023 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3024 // top of the stack applied to the exception.
3026 // 5. If it's a STOP_FRAME, then kill the thread.
3031 info = get_ret_itbl((StgClosure *)frame);
3033 while (info->i.type != UPDATE_FRAME
3034 && (info->i.type != CATCH_FRAME || exception == NULL)
3035 && info->i.type != STOP_FRAME) {
3036 frame += stack_frame_sizeW((StgClosure *)frame);
3037 info = get_ret_itbl((StgClosure *)frame);
3040 switch (info->i.type) {
3043 // If we find a CATCH_FRAME, and we've got an exception to raise,
3044 // then build the THUNK raise(exception), and leave it on
3045 // top of the CATCH_FRAME ready to enter.
3049 StgCatchFrame *cf = (StgCatchFrame *)frame;
3053 // we've got an exception to raise, so let's pass it to the
3054 // handler in this frame.
3056 raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3057 TICK_ALLOC_SE_THK(1,0);
3058 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3059 raise->payload[0] = exception;
3061 // throw away the stack from Sp up to the CATCH_FRAME.
3065 /* Ensure that async excpetions are blocked now, so we don't get
3066 * a surprise exception before we get around to executing the
3069 if (tso->blocked_exceptions == NULL) {
3070 tso->blocked_exceptions = END_TSO_QUEUE;
3073 /* Put the newly-built THUNK on top of the stack, ready to execute
3074 * when the thread restarts.
3077 sp[-1] = (W_)&stg_enter_info;
3079 tso->what_next = ThreadRunGHC;
3080 IF_DEBUG(sanity, checkTSO(tso));
3089 // First build an AP_STACK consisting of the stack chunk above the
3090 // current update frame, with the top word on the stack as the
3093 words = frame - sp - 1;
3094 ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3097 ap->fun = (StgClosure *)sp[0];
3099 for(i=0; i < (nat)words; ++i) {
3100 ap->payload[i] = (StgClosure *)*sp++;
3103 SET_HDR(ap,&stg_AP_STACK_info,
3104 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3105 TICK_ALLOC_UP_THK(words+1,0);
3108 debugBelch("sched: Updating ");
3109 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3110 debugBelch(" with ");
3111 printObj((StgClosure *)ap);
3114 // Replace the updatee with an indirection - happily
3115 // this will also wake up any threads currently
3116 // waiting on the result.
3118 // Warning: if we're in a loop, more than one update frame on
3119 // the stack may point to the same object. Be careful not to
3120 // overwrite an IND_OLDGEN in this case, because we'll screw
3121 // up the mutable lists. To be on the safe side, don't
3122 // overwrite any kind of indirection at all. See also
3123 // threadSqueezeStack in GC.c, where we have to make a similar
3126 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3127 // revert the black hole
3128 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3131 sp += sizeofW(StgUpdateFrame) - 1;
3132 sp[0] = (W_)ap; // push onto stack
3137 // We've stripped the entire stack, the thread is now dead.
3138 sp += sizeofW(StgStopFrame);
3139 tso->what_next = ThreadKilled;
3150 /* -----------------------------------------------------------------------------
3151 raiseExceptionHelper
3153 This function is called by the raise# primitve, just so that we can
3154 move some of the tricky bits of raising an exception from C-- into
3155 C. Who knows, it might be a useful re-useable thing here too.
3156 -------------------------------------------------------------------------- */
3159 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3161 StgClosure *raise_closure = NULL;
3163 StgRetInfoTable *info;
3165 // This closure represents the expression 'raise# E' where E
3166 // is the exception raise. It is used to overwrite all the
3167 // thunks which are currently under evaluataion.
3171 // LDV profiling: stg_raise_info has THUNK as its closure
3172 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3173 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3174 // 1 does not cause any problem unless profiling is performed.
3175 // However, when LDV profiling goes on, we need to linearly scan
3176 // small object pool, where raise_closure is stored, so we should
3177 // use MIN_UPD_SIZE.
3179 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3180 // sizeofW(StgClosure)+1);
3184 // Walk up the stack, looking for the catch frame. On the way,
3185 // we update any closures pointed to from update frames with the
3186 // raise closure that we just built.
3190 info = get_ret_itbl((StgClosure *)p);
3191 next = p + stack_frame_sizeW((StgClosure *)p);
3192 switch (info->i.type) {
3195 // Only create raise_closure if we need to.
3196 if (raise_closure == NULL) {
3198 (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3199 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3200 raise_closure->payload[0] = exception;
3202 UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3221 /* -----------------------------------------------------------------------------
3222 resurrectThreads is called after garbage collection on the list of
3223 threads found to be garbage. Each of these threads will be woken
3224 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3225 on an MVar, or NonTermination if the thread was blocked on a Black
3228 Locks: sched_mutex isn't held upon entry nor exit.
3229 -------------------------------------------------------------------------- */
3232 resurrectThreads( StgTSO *threads )
3236 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3237 next = tso->global_link;
3238 tso->global_link = all_threads;
3240 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3242 switch (tso->why_blocked) {
3244 case BlockedOnException:
3245 /* Called by GC - sched_mutex lock is currently held. */
3246 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3248 case BlockedOnBlackHole:
3249 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3252 /* This might happen if the thread was blocked on a black hole
3253 * belonging to a thread that we've just woken up (raiseAsync
3254 * can wake up threads, remember...).
3258 barf("resurrectThreads: thread blocked in a strange way");
3263 /* ----------------------------------------------------------------------------
3264 * Debugging: why is a thread blocked
3265 * [Also provides useful information when debugging threaded programs
3266 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3267 ------------------------------------------------------------------------- */
3271 printThreadBlockage(StgTSO *tso)
3273 switch (tso->why_blocked) {
3275 debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3277 case BlockedOnWrite:
3278 debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3280 #if defined(mingw32_TARGET_OS)
3281 case BlockedOnDoProc:
3282 debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3285 case BlockedOnDelay:
3286 debugBelch("is blocked until %d", tso->block_info.target);
3289 debugBelch("is blocked on an MVar");
3291 case BlockedOnException:
3292 debugBelch("is blocked on delivering an exception to thread %d",
3293 tso->block_info.tso->id);
3295 case BlockedOnBlackHole:
3296 debugBelch("is blocked on a black hole");
3299 debugBelch("is not blocked");
3303 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3304 tso->block_info.closure, info_type(tso->block_info.closure));
3306 case BlockedOnGA_NoSend:
3307 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3308 tso->block_info.closure, info_type(tso->block_info.closure));
3311 case BlockedOnCCall:
3312 debugBelch("is blocked on an external call");
3314 case BlockedOnCCall_NoUnblockExc:
3315 debugBelch("is blocked on an external call (exceptions were already blocked)");
3318 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3319 tso->why_blocked, tso->id, tso);
3325 printThreadStatus(StgTSO *tso)
3327 switch (tso->what_next) {
3329 debugBelch("has been killed");
3331 case ThreadComplete:
3332 debugBelch("has completed");
3335 printThreadBlockage(tso);
3340 printAllThreads(void)
3345 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3346 ullong_format_string(TIME_ON_PROC(CurrentProc),
3347 time_string, rtsFalse/*no commas!*/);
3349 debugBelch("all threads at [%s]:\n", time_string);
3351 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3352 ullong_format_string(CURRENT_TIME,
3353 time_string, rtsFalse/*no commas!*/);
3355 debugBelch("all threads at [%s]:\n", time_string);
3357 debugBelch("all threads:\n");
3360 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3361 debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3364 void *label = lookupThreadLabel(t->id);
3365 if (label) debugBelch("[\"%s\"] ",(char *)label);
3368 printThreadStatus(t);
3376 Print a whole blocking queue attached to node (debugging only).
3380 print_bq (StgClosure *node)
3382 StgBlockingQueueElement *bqe;
3386 debugBelch("## BQ of closure %p (%s): ",
3387 node, info_type(node));
3389 /* should cover all closures that may have a blocking queue */
3390 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3391 get_itbl(node)->type == FETCH_ME_BQ ||
3392 get_itbl(node)->type == RBH ||
3393 get_itbl(node)->type == MVAR);
3395 ASSERT(node!=(StgClosure*)NULL); // sanity check
3397 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3401 Print a whole blocking queue starting with the element bqe.
3404 print_bqe (StgBlockingQueueElement *bqe)
3409 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3411 for (end = (bqe==END_BQ_QUEUE);
3412 !end; // iterate until bqe points to a CONSTR
3413 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3414 bqe = end ? END_BQ_QUEUE : bqe->link) {
3415 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3416 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3417 /* types of closures that may appear in a blocking queue */
3418 ASSERT(get_itbl(bqe)->type == TSO ||
3419 get_itbl(bqe)->type == BLOCKED_FETCH ||
3420 get_itbl(bqe)->type == CONSTR);
3421 /* only BQs of an RBH end with an RBH_Save closure */
3422 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3424 switch (get_itbl(bqe)->type) {
3426 debugBelch(" TSO %u (%x),",
3427 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3430 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3431 ((StgBlockedFetch *)bqe)->node,
3432 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3433 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3434 ((StgBlockedFetch *)bqe)->ga.weight);
3437 debugBelch(" %s (IP %p),",
3438 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3439 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3440 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3441 "RBH_Save_?"), get_itbl(bqe));
3444 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3445 info_type((StgClosure *)bqe)); // , node, info_type(node));
3451 # elif defined(GRAN)
3453 print_bq (StgClosure *node)
3455 StgBlockingQueueElement *bqe;
3456 PEs node_loc, tso_loc;
3459 /* should cover all closures that may have a blocking queue */
3460 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3461 get_itbl(node)->type == FETCH_ME_BQ ||
3462 get_itbl(node)->type == RBH);
3464 ASSERT(node!=(StgClosure*)NULL); // sanity check
3465 node_loc = where_is(node);
3467 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3468 node, info_type(node), node_loc);
3471 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3473 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3474 !end; // iterate until bqe points to a CONSTR
3475 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3476 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3477 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3478 /* types of closures that may appear in a blocking queue */
3479 ASSERT(get_itbl(bqe)->type == TSO ||
3480 get_itbl(bqe)->type == CONSTR);
3481 /* only BQs of an RBH end with an RBH_Save closure */
3482 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3484 tso_loc = where_is((StgClosure *)bqe);
3485 switch (get_itbl(bqe)->type) {
3487 debugBelch(" TSO %d (%p) on [PE %d],",
3488 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3491 debugBelch(" %s (IP %p),",
3492 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3493 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3494 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3495 "RBH_Save_?"), get_itbl(bqe));
3498 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3499 info_type((StgClosure *)bqe), node, info_type(node));
3507 Nice and easy: only TSOs on the blocking queue
3510 print_bq (StgClosure *node)
3514 ASSERT(node!=(StgClosure*)NULL); // sanity check
3515 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3516 tso != END_TSO_QUEUE;
3518 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3519 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3520 debugBelch(" TSO %d (%p),", tso->id, tso);
3533 for (i=0, tso=run_queue_hd;
3534 tso != END_TSO_QUEUE;
3543 sched_belch(char *s, ...)
3547 #ifdef RTS_SUPPORTS_THREADS
3548 debugBelch("sched (task %p): ", osThreadId());
3552 debugBelch("sched: ");