1 /* ---------------------------------------------------------------------------
2 * $Id: Schedule.c,v 1.137 2002/04/13 05:33:02 sof Exp $
4 * (c) The GHC Team, 1998-2000
8 * Different GHC ways use this scheduler quite differently (see comments below)
9 * Here is the global picture:
11 * WAY Name CPP flag What's it for
12 * --------------------------------------
13 * mp GUM PAR Parallel execution on a distributed memory machine
14 * s SMP SMP Parallel execution on a shared memory machine
15 * mg GranSim GRAN Simulation of parallel execution
16 * md GUM/GdH DIST Distributed execution (based on GUM)
18 * --------------------------------------------------------------------------*/
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
24 * Version with scheduler monitor support for SMPs (WAY=s):
26 This design provides a high-level API to create and schedule threads etc.
27 as documented in the SMP design document.
29 It uses a monitor design controlled by a single mutex to exercise control
30 over accesses to shared data structures, and builds on the Posix threads
33 The majority of state is shared. In order to keep essential per-task state,
34 there is a Capability structure, which contains all the information
35 needed to run a thread: its STG registers, a pointer to its TSO, a
36 nursery etc. During STG execution, a pointer to the capability is
37 kept in a register (BaseReg).
39 In a non-SMP build, there is one global capability, namely MainRegTable.
43 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
45 The main scheduling loop in GUM iterates until a finish message is received.
46 In that case a global flag @receivedFinish@ is set and this instance of
47 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48 for the handling of incoming messages, such as PP_FINISH.
49 Note that in the parallel case we have a system manager that coordinates
50 different PEs, each of which are running one instance of the RTS.
51 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
54 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
56 The main scheduling code in GranSim is quite different from that in std
57 (concurrent) Haskell: while concurrent Haskell just iterates over the
58 threads in the runnable queue, GranSim is event driven, i.e. it iterates
59 over the events in the global event queue. -- HWL
64 //* Variables and Data structures::
65 //* Main scheduling loop::
66 //* Suspend and Resume::
68 //* Garbage Collextion Routines::
69 //* Blocking Queue Routines::
70 //* Exception Handling Routines::
71 //* Debugging Routines::
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
78 #include "PosixSource.h"
85 #include "StgStartup.h"
88 #include "StgMiscClosures.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
117 #ifdef HAVE_SYS_TYPES_H
118 #include <sys/types.h>
126 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
127 //@subsection Variables and Data structures
129 /* Main thread queue.
130 * Locks required: sched_mutex.
132 StgMainThread *main_threads;
135 * Locks required: sched_mutex.
139 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
140 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
143 In GranSim we have a runnable and a blocked queue for each processor.
144 In order to minimise code changes new arrays run_queue_hds/tls
145 are created. run_queue_hd is then a short cut (macro) for
146 run_queue_hds[CurrentProc] (see GranSim.h).
149 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
150 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
151 StgTSO *ccalling_threadss[MAX_PROC];
152 /* We use the same global list of threads (all_threads) in GranSim as in
153 the std RTS (i.e. we are cheating). However, we don't use this list in
154 the GranSim specific code at the moment (so we are only potentially
159 StgTSO *run_queue_hd, *run_queue_tl;
160 StgTSO *blocked_queue_hd, *blocked_queue_tl;
161 StgTSO *sleeping_queue; /* perhaps replace with a hash table? */
165 /* Linked list of all threads.
166 * Used for detecting garbage collected threads.
170 /* When a thread performs a safe C call (_ccall_GC, using old
171 * terminology), it gets put on the suspended_ccalling_threads
172 * list. Used by the garbage collector.
174 static StgTSO *suspended_ccalling_threads;
176 static StgTSO *threadStackOverflow(StgTSO *tso);
178 /* KH: The following two flags are shared memory locations. There is no need
179 to lock them, since they are only unset at the end of a scheduler
183 /* flag set by signal handler to precipitate a context switch */
184 //@cindex context_switch
187 /* if this flag is set as well, give up execution */
188 //@cindex interrupted
191 /* Next thread ID to allocate.
192 * Locks required: sched_mutex
194 //@cindex next_thread_id
195 StgThreadID next_thread_id = 1;
198 * Pointers to the state of the current thread.
199 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
200 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
203 /* The smallest stack size that makes any sense is:
204 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
205 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
206 * + 1 (the realworld token for an IO thread)
207 * + 1 (the closure to enter)
209 * A thread with this stack will bomb immediately with a stack
210 * overflow, which will increase its stack size.
213 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
220 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
221 * exists - earlier gccs apparently didn't.
228 void addToBlockedQueue ( StgTSO *tso );
230 static void schedule ( void );
231 void interruptStgRts ( void );
233 static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri );
235 static StgTSO * createThread_ ( nat size, rtsBool have_lock );
238 static void detectBlackHoles ( void );
241 static void sched_belch(char *s, ...);
244 #if defined(RTS_SUPPORTS_THREADS)
245 /* ToDo: carefully document the invariants that go together
246 * with these synchronisation objects.
248 Mutex sched_mutex = INIT_MUTEX_VAR;
249 Mutex term_mutex = INIT_MUTEX_VAR;
252 static Condition gc_pending_cond = INIT_COND_VAR;
256 #endif /* RTS_SUPPORTS_THREADS */
260 rtsTime TimeOfLastYield;
261 rtsBool emitSchedule = rtsTrue;
265 char *whatNext_strs[] = {
273 char *threadReturnCode_strs[] = {
274 "HeapOverflow", /* might also be StackOverflow */
283 StgTSO * createSparkThread(rtsSpark spark);
284 StgTSO * activateSpark (rtsSpark spark);
288 * The thread state for the main thread.
289 // ToDo: check whether not needed any more
293 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
294 static void taskStart(void);
305 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
306 //@subsection Main scheduling loop
308 /* ---------------------------------------------------------------------------
309 Main scheduling loop.
311 We use round-robin scheduling, each thread returning to the
312 scheduler loop when one of these conditions is detected:
315 * timer expires (thread yields)
320 Locking notes: we acquire the scheduler lock once at the beginning
321 of the scheduler loop, and release it when
323 * running a thread, or
324 * waiting for work, or
325 * waiting for a GC to complete.
328 In a GranSim setup this loop iterates over the global event queue.
329 This revolves around the global event queue, which determines what
330 to do next. Therefore, it's more complicated than either the
331 concurrent or the parallel (GUM) setup.
334 GUM iterates over incoming messages.
335 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
336 and sends out a fish whenever it has nothing to do; in-between
337 doing the actual reductions (shared code below) it processes the
338 incoming messages and deals with delayed operations
339 (see PendingFetches).
340 This is not the ugliest code you could imagine, but it's bloody close.
342 ------------------------------------------------------------------------ */
349 StgThreadReturnCode ret;
357 rtsBool receivedFinish = rtsFalse;
359 nat tp_size, sp_size; // stats only
362 rtsBool was_interrupted = rtsFalse;
364 ACQUIRE_LOCK(&sched_mutex);
366 #if defined(RTS_SUPPORTS_THREADS)
367 waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
369 /* simply initialise it in the non-threaded case */
370 grabCapability(&cap);
374 /* set up first event to get things going */
375 /* ToDo: assign costs for system setup and init MainTSO ! */
376 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
378 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
381 fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
382 G_TSO(CurrentTSO, 5));
384 if (RtsFlags.GranFlags.Light) {
385 /* Save current time; GranSim Light only */
386 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
389 event = get_next_event();
391 while (event!=(rtsEvent*)NULL) {
392 /* Choose the processor with the next event */
393 CurrentProc = event->proc;
394 CurrentTSO = event->tso;
398 while (!receivedFinish) { /* set by processMessages */
399 /* when receiving PP_FINISH message */
406 IF_DEBUG(scheduler, printAllThreads());
408 #if defined(RTS_SUPPORTS_THREADS)
409 /* Check to see whether there are any worker threads
410 waiting to deposit external call results. If so,
411 yield our capability */
412 yieldToReturningWorker(&sched_mutex, cap);
415 /* If we're interrupted (the user pressed ^C, or some other
416 * termination condition occurred), kill all the currently running
420 IF_DEBUG(scheduler, sched_belch("interrupted"));
422 interrupted = rtsFalse;
423 was_interrupted = rtsTrue;
426 /* Go through the list of main threads and wake up any
427 * clients whose computations have finished. ToDo: this
428 * should be done more efficiently without a linear scan
429 * of the main threads list, somehow...
431 #if defined(RTS_SUPPORTS_THREADS)
433 StgMainThread *m, **prev;
434 prev = &main_threads;
435 for (m = main_threads; m != NULL; m = m->link) {
436 switch (m->tso->what_next) {
439 *(m->ret) = (StgClosure *)m->tso->sp[0];
443 broadcastCondition(&m->wakeup);
449 if (m->ret) *(m->ret) = NULL;
451 if (was_interrupted) {
452 m->stat = Interrupted;
456 broadcastCondition(&m->wakeup);
467 #else /* not threaded */
470 /* in GUM do this only on the Main PE */
473 /* If our main thread has finished or been killed, return.
476 StgMainThread *m = main_threads;
477 if (m->tso->what_next == ThreadComplete
478 || m->tso->what_next == ThreadKilled) {
482 main_threads = main_threads->link;
483 if (m->tso->what_next == ThreadComplete) {
484 /* we finished successfully, fill in the return value */
485 if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
489 if (m->ret) { *(m->ret) = NULL; };
490 if (was_interrupted) {
491 m->stat = Interrupted;
501 /* Top up the run queue from our spark pool. We try to make the
502 * number of threads in the run queue equal to the number of
505 * Disable spark support in SMP for now, non-essential & requires
506 * a little bit of work to make it compile cleanly. -- sof 1/02.
508 #if 0 /* defined(SMP) */
510 nat n = getFreeCapabilities();
511 StgTSO *tso = run_queue_hd;
513 /* Count the run queue */
514 while (n > 0 && tso != END_TSO_QUEUE) {
521 spark = findSpark(rtsFalse);
523 break; /* no more sparks in the pool */
525 /* I'd prefer this to be done in activateSpark -- HWL */
526 /* tricky - it needs to hold the scheduler lock and
527 * not try to re-acquire it -- SDM */
528 createSparkThread(spark);
530 sched_belch("==^^ turning spark of closure %p into a thread",
531 (StgClosure *)spark));
534 /* We need to wake up the other tasks if we just created some
537 if (getFreeCapabilities() - n > 1) {
538 signalCondition( &thread_ready_cond );
543 /* check for signals each time around the scheduler */
544 #ifndef mingw32_TARGET_OS
545 if (signals_pending()) {
546 startSignalHandlers();
550 /* Check whether any waiting threads need to be woken up. If the
551 * run queue is empty, and there are no other tasks running, we
552 * can wait indefinitely for something to happen.
553 * ToDo: what if another client comes along & requests another
556 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
557 awaitEvent( EMPTY_RUN_QUEUE()
559 && allFreeCapabilities()
563 /* we can be interrupted while waiting for I/O... */
564 if (interrupted) continue;
567 * Detect deadlock: when we have no threads to run, there are no
568 * threads waiting on I/O or sleeping, and all the other tasks are
569 * waiting for work, we must have a deadlock of some description.
571 * We first try to find threads blocked on themselves (ie. black
572 * holes), and generate NonTermination exceptions where necessary.
574 * If no threads are black holed, we have a deadlock situation, so
575 * inform all the main threads.
578 if ( EMPTY_THREAD_QUEUES()
579 #if defined(RTS_SUPPORTS_THREADS)
580 && EMPTY_QUEUE(suspended_ccalling_threads)
583 && allFreeCapabilities()
587 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
588 #if defined(THREADED_RTS)
589 /* and SMP mode ..? */
590 releaseCapability(cap);
592 // Garbage collection can release some new threads due to
593 // either (a) finalizers or (b) threads resurrected because
594 // they are about to be send BlockedOnDeadMVar. Any threads
595 // thus released will be immediately runnable.
596 GarbageCollect(GetRoots,rtsTrue);
598 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
601 sched_belch("still deadlocked, checking for black holes..."));
604 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
606 #ifndef mingw32_TARGET_OS
607 /* If we have user-installed signal handlers, then wait
608 * for signals to arrive rather then bombing out with a
611 if ( anyUserHandlers() ) {
613 sched_belch("still deadlocked, waiting for signals..."));
617 // we might be interrupted...
618 if (interrupted) { continue; }
620 if (signals_pending()) {
621 startSignalHandlers();
623 ASSERT(!EMPTY_RUN_QUEUE());
628 /* Probably a real deadlock. Send the current main thread the
629 * Deadlock exception (or in the SMP build, send *all* main
630 * threads the deadlock exception, since none of them can make
635 #if defined(RTS_SUPPORTS_THREADS)
636 for (m = main_threads; m != NULL; m = m->link) {
637 switch (m->tso->why_blocked) {
638 case BlockedOnBlackHole:
639 raiseAsyncWithLock(m->tso, (StgClosure *)NonTermination_closure);
641 case BlockedOnException:
643 raiseAsyncWithLock(m->tso, (StgClosure *)Deadlock_closure);
646 barf("deadlock: main thread blocked in a strange way");
651 switch (m->tso->why_blocked) {
652 case BlockedOnBlackHole:
653 raiseAsyncWithLock(m->tso, (StgClosure *)NonTermination_closure);
655 case BlockedOnException:
657 raiseAsyncWithLock(m->tso, (StgClosure *)Deadlock_closure);
660 barf("deadlock: main thread blocked in a strange way");
665 #if defined(RTS_SUPPORTS_THREADS)
666 /* ToDo: revisit conditions (and mechanism) for shutting
667 down a multi-threaded world */
668 IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
669 shutdownHaskellAndExit(0);
675 /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
679 /* If there's a GC pending, don't do anything until it has
683 IF_DEBUG(scheduler,sched_belch("waiting for GC"));
684 waitCondition( &gc_pending_cond, &sched_mutex );
688 #if defined(RTS_SUPPORTS_THREADS)
689 /* block until we've got a thread on the run queue and a free
693 if ( EMPTY_RUN_QUEUE() ) {
694 /* Give up our capability */
695 releaseCapability(cap);
696 IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
697 waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
698 IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
700 while ( EMPTY_RUN_QUEUE() ) {
701 waitForWorkCapability(&sched_mutex, &cap);
702 IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
709 if (RtsFlags.GranFlags.Light)
710 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
712 /* adjust time based on time-stamp */
713 if (event->time > CurrentTime[CurrentProc] &&
714 event->evttype != ContinueThread)
715 CurrentTime[CurrentProc] = event->time;
717 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
718 if (!RtsFlags.GranFlags.Light)
721 IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
723 /* main event dispatcher in GranSim */
724 switch (event->evttype) {
725 /* Should just be continuing execution */
727 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
728 /* ToDo: check assertion
729 ASSERT(run_queue_hd != (StgTSO*)NULL &&
730 run_queue_hd != END_TSO_QUEUE);
732 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
733 if (!RtsFlags.GranFlags.DoAsyncFetch &&
734 procStatus[CurrentProc]==Fetching) {
735 belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
736 CurrentTSO->id, CurrentTSO, CurrentProc);
739 /* Ignore ContinueThreads for completed threads */
740 if (CurrentTSO->what_next == ThreadComplete) {
741 belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)",
742 CurrentTSO->id, CurrentTSO, CurrentProc);
745 /* Ignore ContinueThreads for threads that are being migrated */
746 if (PROCS(CurrentTSO)==Nowhere) {
747 belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
748 CurrentTSO->id, CurrentTSO, CurrentProc);
751 /* The thread should be at the beginning of the run queue */
752 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
753 belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
754 CurrentTSO->id, CurrentTSO, CurrentProc);
755 break; // run the thread anyway
758 new_event(proc, proc, CurrentTime[proc],
760 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
762 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
763 break; // now actually run the thread; DaH Qu'vam yImuHbej
766 do_the_fetchnode(event);
767 goto next_thread; /* handle next event in event queue */
770 do_the_globalblock(event);
771 goto next_thread; /* handle next event in event queue */
774 do_the_fetchreply(event);
775 goto next_thread; /* handle next event in event queue */
777 case UnblockThread: /* Move from the blocked queue to the tail of */
778 do_the_unblock(event);
779 goto next_thread; /* handle next event in event queue */
781 case ResumeThread: /* Move from the blocked queue to the tail of */
782 /* the runnable queue ( i.e. Qu' SImqa'lu') */
783 event->tso->gran.blocktime +=
784 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
785 do_the_startthread(event);
786 goto next_thread; /* handle next event in event queue */
789 do_the_startthread(event);
790 goto next_thread; /* handle next event in event queue */
793 do_the_movethread(event);
794 goto next_thread; /* handle next event in event queue */
797 do_the_movespark(event);
798 goto next_thread; /* handle next event in event queue */
801 do_the_findwork(event);
802 goto next_thread; /* handle next event in event queue */
805 barf("Illegal event type %u\n", event->evttype);
808 /* This point was scheduler_loop in the old RTS */
810 IF_DEBUG(gran, belch("GRAN: after main switch"));
812 TimeOfLastEvent = CurrentTime[CurrentProc];
813 TimeOfNextEvent = get_time_of_next_event();
814 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
815 // CurrentTSO = ThreadQueueHd;
817 IF_DEBUG(gran, belch("GRAN: time of next event is: %ld",
820 if (RtsFlags.GranFlags.Light)
821 GranSimLight_leave_system(event, &ActiveTSO);
823 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
826 belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
828 /* in a GranSim setup the TSO stays on the run queue */
830 /* Take a thread from the run queue. */
831 t = POP_RUN_QUEUE(); // take_off_run_queue(t);
834 fprintf(stderr, "GRAN: About to run current thread, which is\n");
837 context_switch = 0; // turned on via GranYield, checking events and time slice
840 DumpGranEvent(GR_SCHEDULE, t));
842 procStatus[CurrentProc] = Busy;
845 if (PendingFetches != END_BF_QUEUE) {
849 /* ToDo: phps merge with spark activation above */
850 /* check whether we have local work and send requests if we have none */
851 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
852 /* :-[ no local threads => look out for local sparks */
853 /* the spark pool for the current PE */
854 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
855 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
856 pool->hd < pool->tl) {
858 * ToDo: add GC code check that we really have enough heap afterwards!!
860 * If we're here (no runnable threads) and we have pending
861 * sparks, we must have a space problem. Get enough space
862 * to turn one of those pending sparks into a
866 spark = findSpark(rtsFalse); /* get a spark */
867 if (spark != (rtsSpark) NULL) {
868 tso = activateSpark(spark); /* turn the spark into a thread */
869 IF_PAR_DEBUG(schedule,
870 belch("==== schedule: Created TSO %d (%p); %d threads active",
871 tso->id, tso, advisory_thread_count));
873 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
874 belch("==^^ failed to activate spark");
876 } /* otherwise fall through & pick-up new tso */
878 IF_PAR_DEBUG(verbose,
879 belch("==^^ no local sparks (spark pool contains only NFs: %d)",
880 spark_queue_len(pool)));
885 /* If we still have no work we need to send a FISH to get a spark
888 if (EMPTY_RUN_QUEUE()) {
889 /* =8-[ no local sparks => look for work on other PEs */
891 * We really have absolutely no work. Send out a fish
892 * (there may be some out there already), and wait for
893 * something to arrive. We clearly can't run any threads
894 * until a SCHEDULE or RESUME arrives, and so that's what
895 * we're hoping to see. (Of course, we still have to
896 * respond to other types of messages.)
898 TIME now = msTime() /*CURRENT_TIME*/;
899 IF_PAR_DEBUG(verbose,
900 belch("-- now=%ld", now));
901 IF_PAR_DEBUG(verbose,
902 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
903 (last_fish_arrived_at!=0 &&
904 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
905 belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
906 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
907 last_fish_arrived_at,
908 RtsFlags.ParFlags.fishDelay, now);
911 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
912 (last_fish_arrived_at==0 ||
913 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
914 /* outstandingFishes is set in sendFish, processFish;
915 avoid flooding system with fishes via delay */
917 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
920 // Global statistics: count no. of fishes
921 if (RtsFlags.ParFlags.ParStats.Global &&
922 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
923 globalParStats.tot_fish_mess++;
927 receivedFinish = processMessages();
930 } else if (PacketsWaiting()) { /* Look for incoming messages */
931 receivedFinish = processMessages();
934 /* Now we are sure that we have some work available */
935 ASSERT(run_queue_hd != END_TSO_QUEUE);
937 /* Take a thread from the run queue, if we have work */
938 t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE);
939 IF_DEBUG(sanity,checkTSO(t));
941 /* ToDo: write something to the log-file
942 if (RTSflags.ParFlags.granSimStats && !sameThread)
943 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
947 /* the spark pool for the current PE */
948 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
951 belch("--=^ %d threads, %d sparks on [%#x]",
952 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
955 if (0 && RtsFlags.ParFlags.ParStats.Full &&
956 t && LastTSO && t->id != LastTSO->id &&
957 LastTSO->why_blocked == NotBlocked &&
958 LastTSO->what_next != ThreadComplete) {
959 // if previously scheduled TSO not blocked we have to record the context switch
960 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
961 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
964 if (RtsFlags.ParFlags.ParStats.Full &&
965 (emitSchedule /* forced emit */ ||
966 (t && LastTSO && t->id != LastTSO->id))) {
968 we are running a different TSO, so write a schedule event to log file
969 NB: If we use fair scheduling we also have to write a deschedule
970 event for LastTSO; with unfair scheduling we know that the
971 previous tso has blocked whenever we switch to another tso, so
972 we don't need it in GUM for now
974 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
975 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
976 emitSchedule = rtsFalse;
980 #else /* !GRAN && !PAR */
982 /* grab a thread from the run queue */
983 ASSERT(run_queue_hd != END_TSO_QUEUE);
985 // Sanity check the thread we're about to run. This can be
986 // expensive if there is lots of thread switching going on...
987 IF_DEBUG(sanity,checkTSO(t));
990 cap->r.rCurrentTSO = t;
992 /* context switches are now initiated by the timer signal, unless
993 * the user specified "context switch as often as possible", with
998 RtsFlags.ProfFlags.profileInterval == 0 ||
1000 (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1001 && (run_queue_hd != END_TSO_QUEUE
1002 || blocked_queue_hd != END_TSO_QUEUE
1003 || sleeping_queue != END_TSO_QUEUE)))
1008 RELEASE_LOCK(&sched_mutex);
1010 IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...",
1011 t->id, t, whatNext_strs[t->what_next]));
1014 startHeapProfTimer();
1017 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1018 /* Run the current thread
1020 switch (cap->r.rCurrentTSO->what_next) {
1022 case ThreadComplete:
1023 /* Thread already finished, return to scheduler. */
1024 ret = ThreadFinished;
1026 case ThreadEnterGHC:
1027 ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1030 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1032 case ThreadEnterInterp:
1033 ret = interpretBCO(cap);
1036 barf("schedule: invalid what_next field");
1038 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1040 /* Costs for the scheduler are assigned to CCS_SYSTEM */
1042 stopHeapProfTimer();
1046 ACQUIRE_LOCK(&sched_mutex);
1049 IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1050 #elif !defined(GRAN) && !defined(PAR)
1051 IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1053 t = cap->r.rCurrentTSO;
1056 /* HACK 675: if the last thread didn't yield, make sure to print a
1057 SCHEDULE event to the log file when StgRunning the next thread, even
1058 if it is the same one as before */
1060 TimeOfLastYield = CURRENT_TIME;
1066 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1067 globalGranStats.tot_heapover++;
1069 globalParStats.tot_heapover++;
1072 // did the task ask for a large block?
1073 if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1074 // if so, get one and push it on the front of the nursery.
1078 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1080 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)",
1082 whatNext_strs[t->what_next], blocks));
1084 // don't do this if it would push us over the
1085 // alloc_blocks_lim limit; we'll GC first.
1086 if (alloc_blocks + blocks < alloc_blocks_lim) {
1088 alloc_blocks += blocks;
1089 bd = allocGroup( blocks );
1091 // link the new group into the list
1092 bd->link = cap->r.rCurrentNursery;
1093 bd->u.back = cap->r.rCurrentNursery->u.back;
1094 if (cap->r.rCurrentNursery->u.back != NULL) {
1095 cap->r.rCurrentNursery->u.back->link = bd;
1097 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1098 g0s0->blocks == cap->r.rNursery);
1099 cap->r.rNursery = g0s0->blocks = bd;
1101 cap->r.rCurrentNursery->u.back = bd;
1103 // initialise it as a nursery block
1107 bd->free = bd->start;
1109 // don't forget to update the block count in g0s0.
1110 g0s0->n_blocks += blocks;
1111 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1113 // now update the nursery to point to the new block
1114 cap->r.rCurrentNursery = bd;
1116 // we might be unlucky and have another thread get on the
1117 // run queue before us and steal the large block, but in that
1118 // case the thread will just end up requesting another large
1120 PUSH_ON_RUN_QUEUE(t);
1125 /* make all the running tasks block on a condition variable,
1126 * maybe set context_switch and wait till they all pile in,
1127 * then have them wait on a GC condition variable.
1129 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow",
1130 t->id, t, whatNext_strs[t->what_next]));
1133 ASSERT(!is_on_queue(t,CurrentProc));
1135 /* Currently we emit a DESCHEDULE event before GC in GUM.
1136 ToDo: either add separate event to distinguish SYSTEM time from rest
1137 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1138 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1139 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1140 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1141 emitSchedule = rtsTrue;
1145 ready_to_gc = rtsTrue;
1146 context_switch = 1; /* stop other threads ASAP */
1147 PUSH_ON_RUN_QUEUE(t);
1148 /* actual GC is done at the end of the while loop */
1154 DumpGranEvent(GR_DESCHEDULE, t));
1155 globalGranStats.tot_stackover++;
1158 // DumpGranEvent(GR_DESCHEDULE, t);
1159 globalParStats.tot_stackover++;
1161 IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow",
1162 t->id, t, whatNext_strs[t->what_next]));
1163 /* just adjust the stack for this thread, then pop it back
1169 /* enlarge the stack */
1170 StgTSO *new_t = threadStackOverflow(t);
1172 /* This TSO has moved, so update any pointers to it from the
1173 * main thread stack. It better not be on any other queues...
1174 * (it shouldn't be).
1176 for (m = main_threads; m != NULL; m = m->link) {
1181 threadPaused(new_t);
1182 PUSH_ON_RUN_QUEUE(new_t);
1186 case ThreadYielding:
1189 DumpGranEvent(GR_DESCHEDULE, t));
1190 globalGranStats.tot_yields++;
1193 // DumpGranEvent(GR_DESCHEDULE, t);
1194 globalParStats.tot_yields++;
1196 /* put the thread back on the run queue. Then, if we're ready to
1197 * GC, check whether this is the last task to stop. If so, wake
1198 * up the GC thread. getThread will block during a GC until the
1202 if (t->what_next == ThreadEnterInterp) {
1203 /* ToDo: or maybe a timer expired when we were in Hugs?
1204 * or maybe someone hit ctrl-C
1206 belch("--<< thread %ld (%p; %s) stopped to switch to Hugs",
1207 t->id, t, whatNext_strs[t->what_next]);
1209 belch("--<< thread %ld (%p; %s) stopped, yielding",
1210 t->id, t, whatNext_strs[t->what_next]);
1217 //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1219 ASSERT(t->link == END_TSO_QUEUE);
1221 ASSERT(!is_on_queue(t,CurrentProc));
1224 //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1225 checkThreadQsSanity(rtsTrue));
1228 if (RtsFlags.ParFlags.doFairScheduling) {
1229 /* this does round-robin scheduling; good for concurrency */
1230 APPEND_TO_RUN_QUEUE(t);
1232 /* this does unfair scheduling; good for parallelism */
1233 PUSH_ON_RUN_QUEUE(t);
1236 /* this does round-robin scheduling; good for concurrency */
1237 APPEND_TO_RUN_QUEUE(t);
1240 /* add a ContinueThread event to actually process the thread */
1241 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1243 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1245 belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1254 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ",
1255 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1256 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1258 // ??? needed; should emit block before
1260 DumpGranEvent(GR_DESCHEDULE, t));
1261 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1264 ASSERT(procStatus[CurrentProc]==Busy ||
1265 ((procStatus[CurrentProc]==Fetching) &&
1266 (t->block_info.closure!=(StgClosure*)NULL)));
1267 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1268 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1269 procStatus[CurrentProc]==Fetching))
1270 procStatus[CurrentProc] = Idle;
1274 belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ",
1275 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1278 if (t->block_info.closure!=(StgClosure*)NULL)
1279 print_bq(t->block_info.closure));
1281 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1284 /* whatever we schedule next, we must log that schedule */
1285 emitSchedule = rtsTrue;
1288 /* don't need to do anything. Either the thread is blocked on
1289 * I/O, in which case we'll have called addToBlockedQueue
1290 * previously, or it's blocked on an MVar or Blackhole, in which
1291 * case it'll be on the relevant queue already.
1294 fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1295 printThreadBlockage(t);
1296 fprintf(stderr, "\n"));
1298 /* Only for dumping event to log file
1299 ToDo: do I need this in GranSim, too?
1306 case ThreadFinished:
1307 /* Need to check whether this was a main thread, and if so, signal
1308 * the task that started it with the return value. If we have no
1309 * more main threads, we probably need to stop all the tasks until
1312 /* We also end up here if the thread kills itself with an
1313 * uncaught exception, see Exception.hc.
1315 IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1317 endThread(t, CurrentProc); // clean-up the thread
1319 /* For now all are advisory -- HWL */
1320 //if(t->priority==AdvisoryPriority) ??
1321 advisory_thread_count--;
1324 if(t->dist.priority==RevalPriority)
1328 if (RtsFlags.ParFlags.ParStats.Full &&
1329 !RtsFlags.ParFlags.ParStats.Suppressed)
1330 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1335 barf("schedule: invalid thread return code %d", (int)ret);
1339 if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1340 GarbageCollect(GetRoots, rtsTrue);
1342 performHeapProfile = rtsFalse;
1343 ready_to_gc = rtsFalse; // we already GC'd
1349 && allFreeCapabilities()
1352 /* everybody back, start the GC.
1353 * Could do it in this thread, or signal a condition var
1354 * to do it in another thread. Either way, we need to
1355 * broadcast on gc_pending_cond afterward.
1357 #if defined(RTS_SUPPORTS_THREADS)
1358 IF_DEBUG(scheduler,sched_belch("doing GC"));
1360 GarbageCollect(GetRoots,rtsFalse);
1361 ready_to_gc = rtsFalse;
1363 broadcastCondition(&gc_pending_cond);
1366 /* add a ContinueThread event to continue execution of current thread */
1367 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1369 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1371 fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1379 IF_GRAN_DEBUG(unused,
1380 print_eventq(EventHd));
1382 event = get_next_event();
1385 /* ToDo: wait for next message to arrive rather than busy wait */
1388 } /* end of while(1) */
1390 IF_PAR_DEBUG(verbose,
1391 belch("== Leaving schedule() after having received Finish"));
1394 /* ---------------------------------------------------------------------------
1395 * Singleton fork(). Do not copy any running threads.
1396 * ------------------------------------------------------------------------- */
1398 StgInt forkProcess(StgTSO* tso) {
1400 #ifndef mingw32_TARGET_OS
1404 IF_DEBUG(scheduler,sched_belch("forking!"));
1407 if (pid) { /* parent */
1409 /* just return the pid */
1411 } else { /* child */
1412 /* wipe all other threads */
1414 tso->link = END_TSO_QUEUE;
1416 /* DO NOT TOUCH THE QUEUES directly because most of the code around
1417 us is picky about finding the threat still in its queue when
1418 handling the deleteThread() */
1420 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1422 if (t->id != tso->id) {
1429 barf("forkProcess#: primop not implemented for mingw32, sorry!");
1431 #endif /* mingw32 */
1434 /* ---------------------------------------------------------------------------
1435 * deleteAllThreads(): kill all the live threads.
1437 * This is used when we catch a user interrupt (^C), before performing
1438 * any necessary cleanups and running finalizers.
1439 * ------------------------------------------------------------------------- */
1441 void deleteAllThreads ( void )
1444 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1445 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1446 next = t->global_link;
1449 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1450 blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1451 sleeping_queue = END_TSO_QUEUE;
1454 /* startThread and insertThread are now in GranSim.c -- HWL */
1457 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1458 //@subsection Suspend and Resume
1460 /* ---------------------------------------------------------------------------
1461 * Suspending & resuming Haskell threads.
1463 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1464 * its capability before calling the C function. This allows another
1465 * task to pick up the capability and carry on running Haskell
1466 * threads. It also means that if the C call blocks, it won't lock
1469 * The Haskell thread making the C call is put to sleep for the
1470 * duration of the call, on the susepended_ccalling_threads queue. We
1471 * give out a token to the task, which it can use to resume the thread
1472 * on return from the C function.
1473 * ------------------------------------------------------------------------- */
1476 suspendThread( StgRegTable *reg,
1478 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1486 /* assume that *reg is a pointer to the StgRegTable part
1489 cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1491 ACQUIRE_LOCK(&sched_mutex);
1494 sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1496 threadPaused(cap->r.rCurrentTSO);
1497 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1498 suspended_ccalling_threads = cap->r.rCurrentTSO;
1500 #if defined(RTS_SUPPORTS_THREADS)
1501 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1504 /* Use the thread ID as the token; it should be unique */
1505 tok = cap->r.rCurrentTSO->id;
1507 /* Hand back capability */
1508 releaseCapability(cap);
1510 #if defined(RTS_SUPPORTS_THREADS)
1511 /* Preparing to leave the RTS, so ensure there's a native thread/task
1512 waiting to take over.
1514 ToDo: optimise this and only create a new task if there's a need
1515 for one (i.e., if there's only one Concurrent Haskell thread alive,
1516 there's no need to create a new task).
1518 IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1520 startTask(taskStart);
1524 /* Other threads _might_ be available for execution; signal this */
1526 RELEASE_LOCK(&sched_mutex);
1531 resumeThread( StgInt tok,
1533 #if !defined(RTS_SUPPORTS_THREADS)
1538 StgTSO *tso, **prev;
1541 #if defined(RTS_SUPPORTS_THREADS)
1542 /* Wait for permission to re-enter the RTS with the result. */
1544 ACQUIRE_LOCK(&sched_mutex);
1545 grabReturnCapability(&sched_mutex, &cap);
1547 grabCapability(&cap);
1550 grabCapability(&cap);
1553 /* Remove the thread off of the suspended list */
1554 prev = &suspended_ccalling_threads;
1555 for (tso = suspended_ccalling_threads;
1556 tso != END_TSO_QUEUE;
1557 prev = &tso->link, tso = tso->link) {
1558 if (tso->id == (StgThreadID)tok) {
1563 if (tso == END_TSO_QUEUE) {
1564 barf("resumeThread: thread not found");
1566 tso->link = END_TSO_QUEUE;
1567 /* Reset blocking status */
1568 tso->why_blocked = NotBlocked;
1570 RELEASE_LOCK(&sched_mutex);
1572 cap->r.rCurrentTSO = tso;
1577 /* ---------------------------------------------------------------------------
1579 * ------------------------------------------------------------------------ */
1580 static void unblockThread(StgTSO *tso);
1582 /* ---------------------------------------------------------------------------
1583 * Comparing Thread ids.
1585 * This is used from STG land in the implementation of the
1586 * instances of Eq/Ord for ThreadIds.
1587 * ------------------------------------------------------------------------ */
1589 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2)
1591 StgThreadID id1 = tso1->id;
1592 StgThreadID id2 = tso2->id;
1594 if (id1 < id2) return (-1);
1595 if (id1 > id2) return 1;
1599 /* ---------------------------------------------------------------------------
1600 * Fetching the ThreadID from an StgTSO.
1602 * This is used in the implementation of Show for ThreadIds.
1603 * ------------------------------------------------------------------------ */
1604 int rts_getThreadId(const StgTSO *tso)
1610 void labelThread(StgTSO *tso, char *label)
1615 /* Caveat: Once set, you can only set the thread name to "" */
1616 len = strlen(label)+1;
1617 buf = realloc(tso->label,len);
1619 fprintf(stderr,"insufficient memory for labelThread!\n");
1622 strncpy(buf,label,len);
1627 /* ---------------------------------------------------------------------------
1628 Create a new thread.
1630 The new thread starts with the given stack size. Before the
1631 scheduler can run, however, this thread needs to have a closure
1632 (and possibly some arguments) pushed on its stack. See
1633 pushClosure() in Schedule.h.
1635 createGenThread() and createIOThread() (in SchedAPI.h) are
1636 convenient packaged versions of this function.
1638 currently pri (priority) is only used in a GRAN setup -- HWL
1639 ------------------------------------------------------------------------ */
1640 //@cindex createThread
1642 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1644 createThread(nat stack_size, StgInt pri)
1646 return createThread_(stack_size, rtsFalse, pri);
1650 createThread_(nat size, rtsBool have_lock, StgInt pri)
1654 createThread(nat stack_size)
1656 return createThread_(stack_size, rtsFalse);
1660 createThread_(nat size, rtsBool have_lock)
1667 /* First check whether we should create a thread at all */
1669 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1670 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1672 belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1673 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1674 return END_TSO_QUEUE;
1680 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1683 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1685 /* catch ridiculously small stack sizes */
1686 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1687 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1690 stack_size = size - TSO_STRUCT_SIZEW;
1692 tso = (StgTSO *)allocate(size);
1693 TICK_ALLOC_TSO(stack_size, 0);
1695 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1697 SET_GRAN_HDR(tso, ThisPE);
1699 tso->what_next = ThreadEnterGHC;
1705 /* tso->id needs to be unique. For now we use a heavyweight mutex to
1706 * protect the increment operation on next_thread_id.
1707 * In future, we could use an atomic increment instead.
1709 if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1710 tso->id = next_thread_id++;
1711 if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1713 tso->why_blocked = NotBlocked;
1714 tso->blocked_exceptions = NULL;
1716 tso->stack_size = stack_size;
1717 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1719 tso->sp = (P_)&(tso->stack) + stack_size;
1722 tso->prof.CCCS = CCS_MAIN;
1725 /* put a stop frame on the stack */
1726 tso->sp -= sizeofW(StgStopFrame);
1727 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1728 tso->su = (StgUpdateFrame*)tso->sp;
1732 tso->link = END_TSO_QUEUE;
1733 /* uses more flexible routine in GranSim */
1734 insertThread(tso, CurrentProc);
1736 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1742 if (RtsFlags.GranFlags.GranSimStats.Full)
1743 DumpGranEvent(GR_START,tso);
1745 if (RtsFlags.ParFlags.ParStats.Full)
1746 DumpGranEvent(GR_STARTQ,tso);
1747 /* HACk to avoid SCHEDULE
1751 /* Link the new thread on the global thread list.
1753 tso->global_link = all_threads;
1757 tso->dist.priority = MandatoryPriority; //by default that is...
1761 tso->gran.pri = pri;
1763 tso->gran.magic = TSO_MAGIC; // debugging only
1765 tso->gran.sparkname = 0;
1766 tso->gran.startedat = CURRENT_TIME;
1767 tso->gran.exported = 0;
1768 tso->gran.basicblocks = 0;
1769 tso->gran.allocs = 0;
1770 tso->gran.exectime = 0;
1771 tso->gran.fetchtime = 0;
1772 tso->gran.fetchcount = 0;
1773 tso->gran.blocktime = 0;
1774 tso->gran.blockcount = 0;
1775 tso->gran.blockedat = 0;
1776 tso->gran.globalsparks = 0;
1777 tso->gran.localsparks = 0;
1778 if (RtsFlags.GranFlags.Light)
1779 tso->gran.clock = Now; /* local clock */
1781 tso->gran.clock = 0;
1783 IF_DEBUG(gran,printTSO(tso));
1786 tso->par.magic = TSO_MAGIC; // debugging only
1788 tso->par.sparkname = 0;
1789 tso->par.startedat = CURRENT_TIME;
1790 tso->par.exported = 0;
1791 tso->par.basicblocks = 0;
1792 tso->par.allocs = 0;
1793 tso->par.exectime = 0;
1794 tso->par.fetchtime = 0;
1795 tso->par.fetchcount = 0;
1796 tso->par.blocktime = 0;
1797 tso->par.blockcount = 0;
1798 tso->par.blockedat = 0;
1799 tso->par.globalsparks = 0;
1800 tso->par.localsparks = 0;
1804 globalGranStats.tot_threads_created++;
1805 globalGranStats.threads_created_on_PE[CurrentProc]++;
1806 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1807 globalGranStats.tot_sq_probes++;
1809 // collect parallel global statistics (currently done together with GC stats)
1810 if (RtsFlags.ParFlags.ParStats.Global &&
1811 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1812 //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime());
1813 globalParStats.tot_threads_created++;
1819 belch("==__ schedule: Created TSO %d (%p);",
1820 CurrentProc, tso, tso->id));
1822 IF_PAR_DEBUG(verbose,
1823 belch("==__ schedule: Created TSO %d (%p); %d threads active",
1824 tso->id, tso, advisory_thread_count));
1826 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1827 tso->id, tso->stack_size));
1834 all parallel thread creation calls should fall through the following routine.
1837 createSparkThread(rtsSpark spark)
1839 ASSERT(spark != (rtsSpark)NULL);
1840 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1842 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1843 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1844 return END_TSO_QUEUE;
1848 tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1849 if (tso==END_TSO_QUEUE)
1850 barf("createSparkThread: Cannot create TSO");
1852 tso->priority = AdvisoryPriority;
1854 pushClosure(tso,spark);
1855 PUSH_ON_RUN_QUEUE(tso);
1856 advisory_thread_count++;
1863 Turn a spark into a thread.
1864 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1867 //@cindex activateSpark
1869 activateSpark (rtsSpark spark)
1873 tso = createSparkThread(spark);
1874 if (RtsFlags.ParFlags.ParStats.Full) {
1875 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1876 IF_PAR_DEBUG(verbose,
1877 belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1878 (StgClosure *)spark, info_type((StgClosure *)spark)));
1880 // ToDo: fwd info on local/global spark to thread -- HWL
1881 // tso->gran.exported = spark->exported;
1882 // tso->gran.locked = !spark->global;
1883 // tso->gran.sparkname = spark->name;
1889 /* ---------------------------------------------------------------------------
1892 * scheduleThread puts a thread on the head of the runnable queue.
1893 * This will usually be done immediately after a thread is created.
1894 * The caller of scheduleThread must create the thread using e.g.
1895 * createThread and push an appropriate closure
1896 * on this thread's stack before the scheduler is invoked.
1897 * ------------------------------------------------------------------------ */
1899 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1902 scheduleThread_(StgTSO *tso
1903 , rtsBool createTask
1904 #if !defined(THREADED_RTS)
1909 ACQUIRE_LOCK(&sched_mutex);
1911 /* Put the new thread on the head of the runnable queue. The caller
1912 * better push an appropriate closure on this thread's stack
1913 * beforehand. In the SMP case, the thread may start running as
1914 * soon as we release the scheduler lock below.
1916 PUSH_ON_RUN_QUEUE(tso);
1917 #if defined(THREADED_RTS)
1918 /* If main() is scheduling a thread, don't bother creating a
1922 startTask(taskStart);
1928 IF_DEBUG(scheduler,printTSO(tso));
1930 RELEASE_LOCK(&sched_mutex);
1933 void scheduleThread(StgTSO* tso)
1935 return scheduleThread_(tso, rtsFalse);
1938 void scheduleExtThread(StgTSO* tso)
1940 return scheduleThread_(tso, rtsTrue);
1943 /* ---------------------------------------------------------------------------
1946 * Initialise the scheduler. This resets all the queues - if the
1947 * queues contained any threads, they'll be garbage collected at the
1950 * ------------------------------------------------------------------------ */
1954 term_handler(int sig STG_UNUSED)
1957 ACQUIRE_LOCK(&term_mutex);
1959 RELEASE_LOCK(&term_mutex);
1970 for (i=0; i<=MAX_PROC; i++) {
1971 run_queue_hds[i] = END_TSO_QUEUE;
1972 run_queue_tls[i] = END_TSO_QUEUE;
1973 blocked_queue_hds[i] = END_TSO_QUEUE;
1974 blocked_queue_tls[i] = END_TSO_QUEUE;
1975 ccalling_threadss[i] = END_TSO_QUEUE;
1976 sleeping_queue = END_TSO_QUEUE;
1979 run_queue_hd = END_TSO_QUEUE;
1980 run_queue_tl = END_TSO_QUEUE;
1981 blocked_queue_hd = END_TSO_QUEUE;
1982 blocked_queue_tl = END_TSO_QUEUE;
1983 sleeping_queue = END_TSO_QUEUE;
1986 suspended_ccalling_threads = END_TSO_QUEUE;
1988 main_threads = NULL;
1989 all_threads = END_TSO_QUEUE;
1994 RtsFlags.ConcFlags.ctxtSwitchTicks =
1995 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1997 #if defined(RTS_SUPPORTS_THREADS)
1998 /* Initialise the mutex and condition variables used by
2000 initMutex(&sched_mutex);
2001 initMutex(&term_mutex);
2003 initCondition(&thread_ready_cond);
2007 initCondition(&gc_pending_cond);
2010 #if defined(RTS_SUPPORTS_THREADS)
2011 ACQUIRE_LOCK(&sched_mutex);
2014 /* Install the SIGHUP handler */
2017 struct sigaction action,oact;
2019 action.sa_handler = term_handler;
2020 sigemptyset(&action.sa_mask);
2021 action.sa_flags = 0;
2022 if (sigaction(SIGTERM, &action, &oact) != 0) {
2023 barf("can't install TERM handler");
2028 /* A capability holds the state a native thread needs in
2029 * order to execute STG code. At least one capability is
2030 * floating around (only SMP builds have more than one).
2034 #if defined(RTS_SUPPORTS_THREADS)
2035 /* start our haskell execution tasks */
2037 startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2039 startTaskManager(0,taskStart);
2043 #if /* defined(SMP) ||*/ defined(PAR)
2047 #if defined(RTS_SUPPORTS_THREADS)
2048 RELEASE_LOCK(&sched_mutex);
2054 exitScheduler( void )
2056 #if defined(RTS_SUPPORTS_THREADS)
2061 /* -----------------------------------------------------------------------------
2062 Managing the per-task allocation areas.
2064 Each capability comes with an allocation area. These are
2065 fixed-length block lists into which allocation can be done.
2067 ToDo: no support for two-space collection at the moment???
2068 -------------------------------------------------------------------------- */
2070 /* -----------------------------------------------------------------------------
2071 * waitThread is the external interface for running a new computation
2072 * and waiting for the result.
2074 * In the non-SMP case, we create a new main thread, push it on the
2075 * main-thread stack, and invoke the scheduler to run it. The
2076 * scheduler will return when the top main thread on the stack has
2077 * completed or died, and fill in the necessary fields of the
2078 * main_thread structure.
2080 * In the SMP case, we create a main thread as before, but we then
2081 * create a new condition variable and sleep on it. When our new
2082 * main thread has completed, we'll be woken up and the status/result
2083 * will be in the main_thread struct.
2084 * -------------------------------------------------------------------------- */
2087 howManyThreadsAvail ( void )
2091 for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2093 for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2095 for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2101 finishAllThreads ( void )
2104 while (run_queue_hd != END_TSO_QUEUE) {
2105 waitThread ( run_queue_hd, NULL);
2107 while (blocked_queue_hd != END_TSO_QUEUE) {
2108 waitThread ( blocked_queue_hd, NULL);
2110 while (sleeping_queue != END_TSO_QUEUE) {
2111 waitThread ( blocked_queue_hd, NULL);
2114 (blocked_queue_hd != END_TSO_QUEUE ||
2115 run_queue_hd != END_TSO_QUEUE ||
2116 sleeping_queue != END_TSO_QUEUE);
2120 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2122 IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2123 #if defined(THREADED_RTS)
2124 return waitThread_(tso,ret, rtsFalse);
2126 return waitThread_(tso,ret);
2131 waitThread_(StgTSO *tso,
2132 /*out*/StgClosure **ret
2133 #if defined(THREADED_RTS)
2134 , rtsBool blockWaiting
2139 SchedulerStatus stat;
2141 ACQUIRE_LOCK(&sched_mutex);
2142 IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2144 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2149 #if defined(RTS_SUPPORTS_THREADS)
2150 initCondition(&m->wakeup);
2153 m->link = main_threads;
2156 IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2158 #if defined(RTS_SUPPORTS_THREADS)
2160 # if defined(THREADED_RTS)
2161 if (!blockWaiting) {
2162 /* In the threaded case, the OS thread that called main()
2163 * gets to enter the RTS directly without going via another
2166 RELEASE_LOCK(&sched_mutex);
2168 ASSERT(m->stat != NoStatus);
2172 IF_DEBUG(scheduler, sched_belch("sfoo"));
2174 waitCondition(&m->wakeup, &sched_mutex);
2175 } while (m->stat == NoStatus);
2178 /* GranSim specific init */
2179 CurrentTSO = m->tso; // the TSO to run
2180 procStatus[MainProc] = Busy; // status of main PE
2181 CurrentProc = MainProc; // PE to run it on
2185 RELEASE_LOCK(&sched_mutex);
2187 ASSERT(m->stat != NoStatus);
2192 #if defined(RTS_SUPPORTS_THREADS)
2193 closeCondition(&m->wakeup);
2196 IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n",
2200 #if defined(THREADED_RTS)
2203 RELEASE_LOCK(&sched_mutex);
2208 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2209 //@subsection Run queue code
2213 NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2214 unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2215 implicit global variable that has to be correct when calling these
2219 /* Put the new thread on the head of the runnable queue.
2220 * The caller of createThread better push an appropriate closure
2221 * on this thread's stack before the scheduler is invoked.
2223 static /* inline */ void
2224 add_to_run_queue(tso)
2227 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2228 tso->link = run_queue_hd;
2230 if (run_queue_tl == END_TSO_QUEUE) {
2235 /* Put the new thread at the end of the runnable queue. */
2236 static /* inline */ void
2237 push_on_run_queue(tso)
2240 ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2241 ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2242 ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2243 if (run_queue_hd == END_TSO_QUEUE) {
2246 run_queue_tl->link = tso;
2252 Should be inlined because it's used very often in schedule. The tso
2253 argument is actually only needed in GranSim, where we want to have the
2254 possibility to schedule *any* TSO on the run queue, irrespective of the
2255 actual ordering. Therefore, if tso is not the nil TSO then we traverse
2256 the run queue and dequeue the tso, adjusting the links in the queue.
2258 //@cindex take_off_run_queue
2259 static /* inline */ StgTSO*
2260 take_off_run_queue(StgTSO *tso) {
2264 qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2266 if tso is specified, unlink that tso from the run_queue (doesn't have
2267 to be at the beginning of the queue); GranSim only
2269 if (tso!=END_TSO_QUEUE) {
2270 /* find tso in queue */
2271 for (t=run_queue_hd, prev=END_TSO_QUEUE;
2272 t!=END_TSO_QUEUE && t!=tso;
2276 /* now actually dequeue the tso */
2277 if (prev!=END_TSO_QUEUE) {
2278 ASSERT(run_queue_hd!=t);
2279 prev->link = t->link;
2281 /* t is at beginning of thread queue */
2282 ASSERT(run_queue_hd==t);
2283 run_queue_hd = t->link;
2285 /* t is at end of thread queue */
2286 if (t->link==END_TSO_QUEUE) {
2287 ASSERT(t==run_queue_tl);
2288 run_queue_tl = prev;
2290 ASSERT(run_queue_tl!=t);
2292 t->link = END_TSO_QUEUE;
2294 /* take tso from the beginning of the queue; std concurrent code */
2296 if (t != END_TSO_QUEUE) {
2297 run_queue_hd = t->link;
2298 t->link = END_TSO_QUEUE;
2299 if (run_queue_hd == END_TSO_QUEUE) {
2300 run_queue_tl = END_TSO_QUEUE;
2309 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2310 //@subsection Garbage Collextion Routines
2312 /* ---------------------------------------------------------------------------
2313 Where are the roots that we know about?
2315 - all the threads on the runnable queue
2316 - all the threads on the blocked queue
2317 - all the threads on the sleeping queue
2318 - all the thread currently executing a _ccall_GC
2319 - all the "main threads"
2321 ------------------------------------------------------------------------ */
2323 /* This has to be protected either by the scheduler monitor, or by the
2324 garbage collection monitor (probably the latter).
2329 GetRoots(evac_fn evac)
2334 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2335 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2336 evac((StgClosure **)&run_queue_hds[i]);
2337 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2338 evac((StgClosure **)&run_queue_tls[i]);
2340 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2341 evac((StgClosure **)&blocked_queue_hds[i]);
2342 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2343 evac((StgClosure **)&blocked_queue_tls[i]);
2344 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2345 evac((StgClosure **)&ccalling_threads[i]);
2352 if (run_queue_hd != END_TSO_QUEUE) {
2353 ASSERT(run_queue_tl != END_TSO_QUEUE);
2354 evac((StgClosure **)&run_queue_hd);
2355 evac((StgClosure **)&run_queue_tl);
2358 if (blocked_queue_hd != END_TSO_QUEUE) {
2359 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2360 evac((StgClosure **)&blocked_queue_hd);
2361 evac((StgClosure **)&blocked_queue_tl);
2364 if (sleeping_queue != END_TSO_QUEUE) {
2365 evac((StgClosure **)&sleeping_queue);
2369 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2370 evac((StgClosure **)&suspended_ccalling_threads);
2373 #if defined(PAR) || defined(GRAN)
2374 markSparkQueue(evac);
2378 /* -----------------------------------------------------------------------------
2381 This is the interface to the garbage collector from Haskell land.
2382 We provide this so that external C code can allocate and garbage
2383 collect when called from Haskell via _ccall_GC.
2385 It might be useful to provide an interface whereby the programmer
2386 can specify more roots (ToDo).
2388 This needs to be protected by the GC condition variable above. KH.
2389 -------------------------------------------------------------------------- */
2391 void (*extra_roots)(evac_fn);
2396 /* Obligated to hold this lock upon entry */
2397 ACQUIRE_LOCK(&sched_mutex);
2398 GarbageCollect(GetRoots,rtsFalse);
2399 RELEASE_LOCK(&sched_mutex);
2403 performMajorGC(void)
2405 ACQUIRE_LOCK(&sched_mutex);
2406 GarbageCollect(GetRoots,rtsTrue);
2407 RELEASE_LOCK(&sched_mutex);
2411 AllRoots(evac_fn evac)
2413 GetRoots(evac); // the scheduler's roots
2414 extra_roots(evac); // the user's roots
2418 performGCWithRoots(void (*get_roots)(evac_fn))
2420 ACQUIRE_LOCK(&sched_mutex);
2421 extra_roots = get_roots;
2422 GarbageCollect(AllRoots,rtsFalse);
2423 RELEASE_LOCK(&sched_mutex);
2426 /* -----------------------------------------------------------------------------
2429 If the thread has reached its maximum stack size, then raise the
2430 StackOverflow exception in the offending thread. Otherwise
2431 relocate the TSO into a larger chunk of memory and adjust its stack
2433 -------------------------------------------------------------------------- */
2436 threadStackOverflow(StgTSO *tso)
2438 nat new_stack_size, new_tso_size, diff, stack_words;
2442 IF_DEBUG(sanity,checkTSO(tso));
2443 if (tso->stack_size >= tso->max_stack_size) {
2446 belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2447 tso->id, tso, tso->stack_size, tso->max_stack_size);
2448 /* If we're debugging, just print out the top of the stack */
2449 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2452 /* Send this thread the StackOverflow exception */
2453 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2457 /* Try to double the current stack size. If that takes us over the
2458 * maximum stack size for this thread, then use the maximum instead.
2459 * Finally round up so the TSO ends up as a whole number of blocks.
2461 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2462 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2463 TSO_STRUCT_SIZE)/sizeof(W_);
2464 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2465 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2467 IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2469 dest = (StgTSO *)allocate(new_tso_size);
2470 TICK_ALLOC_TSO(new_stack_size,0);
2472 /* copy the TSO block and the old stack into the new area */
2473 memcpy(dest,tso,TSO_STRUCT_SIZE);
2474 stack_words = tso->stack + tso->stack_size - tso->sp;
2475 new_sp = (P_)dest + new_tso_size - stack_words;
2476 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2478 /* relocate the stack pointers... */
2479 diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2480 dest->su = (StgUpdateFrame *) ((P_)dest->su + diff);
2482 dest->stack_size = new_stack_size;
2484 /* and relocate the update frame list */
2485 relocate_stack(dest, diff);
2487 /* Mark the old TSO as relocated. We have to check for relocated
2488 * TSOs in the garbage collector and any primops that deal with TSOs.
2490 * It's important to set the sp and su values to just beyond the end
2491 * of the stack, so we don't attempt to scavenge any part of the
2494 tso->what_next = ThreadRelocated;
2496 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2497 tso->su = (StgUpdateFrame *)tso->sp;
2498 tso->why_blocked = NotBlocked;
2499 dest->mut_link = NULL;
2501 IF_PAR_DEBUG(verbose,
2502 belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2503 tso->id, tso, tso->stack_size);
2504 /* If we're debugging, just print out the top of the stack */
2505 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2508 IF_DEBUG(sanity,checkTSO(tso));
2510 IF_DEBUG(scheduler,printTSO(dest));
2516 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2517 //@subsection Blocking Queue Routines
2519 /* ---------------------------------------------------------------------------
2520 Wake up a queue that was blocked on some resource.
2521 ------------------------------------------------------------------------ */
2525 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2530 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2532 /* write RESUME events to log file and
2533 update blocked and fetch time (depending on type of the orig closure) */
2534 if (RtsFlags.ParFlags.ParStats.Full) {
2535 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2536 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2537 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2538 if (EMPTY_RUN_QUEUE())
2539 emitSchedule = rtsTrue;
2541 switch (get_itbl(node)->type) {
2543 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2548 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2555 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2562 static StgBlockingQueueElement *
2563 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2566 PEs node_loc, tso_loc;
2568 node_loc = where_is(node); // should be lifted out of loop
2569 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2570 tso_loc = where_is((StgClosure *)tso);
2571 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2572 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2573 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2574 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2575 // insertThread(tso, node_loc);
2576 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2578 tso, node, (rtsSpark*)NULL);
2579 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2582 } else { // TSO is remote (actually should be FMBQ)
2583 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2584 RtsFlags.GranFlags.Costs.gunblocktime +
2585 RtsFlags.GranFlags.Costs.latency;
2586 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2588 tso, node, (rtsSpark*)NULL);
2589 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2592 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2594 fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2595 (node_loc==tso_loc ? "Local" : "Global"),
2596 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2597 tso->block_info.closure = NULL;
2598 IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
2602 static StgBlockingQueueElement *
2603 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2605 StgBlockingQueueElement *next;
2607 switch (get_itbl(bqe)->type) {
2609 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2610 /* if it's a TSO just push it onto the run_queue */
2612 // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2613 PUSH_ON_RUN_QUEUE((StgTSO *)bqe);
2615 unblockCount(bqe, node);
2616 /* reset blocking status after dumping event */
2617 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2621 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2623 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2624 PendingFetches = (StgBlockedFetch *)bqe;
2628 /* can ignore this case in a non-debugging setup;
2629 see comments on RBHSave closures above */
2631 /* check that the closure is an RBHSave closure */
2632 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2633 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2634 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2638 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2639 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2643 IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2647 #else /* !GRAN && !PAR */
2649 unblockOneLocked(StgTSO *tso)
2653 ASSERT(get_itbl(tso)->type == TSO);
2654 ASSERT(tso->why_blocked != NotBlocked);
2655 tso->why_blocked = NotBlocked;
2657 PUSH_ON_RUN_QUEUE(tso);
2659 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2664 #if defined(GRAN) || defined(PAR)
2665 inline StgBlockingQueueElement *
2666 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2668 ACQUIRE_LOCK(&sched_mutex);
2669 bqe = unblockOneLocked(bqe, node);
2670 RELEASE_LOCK(&sched_mutex);
2675 unblockOne(StgTSO *tso)
2677 ACQUIRE_LOCK(&sched_mutex);
2678 tso = unblockOneLocked(tso);
2679 RELEASE_LOCK(&sched_mutex);
2686 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2688 StgBlockingQueueElement *bqe;
2693 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2694 node, CurrentProc, CurrentTime[CurrentProc],
2695 CurrentTSO->id, CurrentTSO));
2697 node_loc = where_is(node);
2699 ASSERT(q == END_BQ_QUEUE ||
2700 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2701 get_itbl(q)->type == CONSTR); // closure (type constructor)
2702 ASSERT(is_unique(node));
2704 /* FAKE FETCH: magically copy the node to the tso's proc;
2705 no Fetch necessary because in reality the node should not have been
2706 moved to the other PE in the first place
2708 if (CurrentProc!=node_loc) {
2710 belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2711 node, node_loc, CurrentProc, CurrentTSO->id,
2712 // CurrentTSO, where_is(CurrentTSO),
2713 node->header.gran.procs));
2714 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2716 belch("## new bitmask of node %p is %#x",
2717 node, node->header.gran.procs));
2718 if (RtsFlags.GranFlags.GranSimStats.Global) {
2719 globalGranStats.tot_fake_fetches++;
2724 // ToDo: check: ASSERT(CurrentProc==node_loc);
2725 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2728 bqe points to the current element in the queue
2729 next points to the next element in the queue
2731 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2732 //tso_loc = where_is(tso);
2734 bqe = unblockOneLocked(bqe, node);
2737 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2738 the closure to make room for the anchor of the BQ */
2739 if (bqe!=END_BQ_QUEUE) {
2740 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2742 ASSERT((info_ptr==&RBH_Save_0_info) ||
2743 (info_ptr==&RBH_Save_1_info) ||
2744 (info_ptr==&RBH_Save_2_info));
2746 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2747 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2748 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2751 belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2752 node, info_type(node)));
2755 /* statistics gathering */
2756 if (RtsFlags.GranFlags.GranSimStats.Global) {
2757 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2758 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2759 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2760 globalGranStats.tot_awbq++; // total no. of bqs awakened
2763 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2764 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2768 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2770 StgBlockingQueueElement *bqe;
2772 ACQUIRE_LOCK(&sched_mutex);
2774 IF_PAR_DEBUG(verbose,
2775 belch("##-_ AwBQ for node %p on [%x]: ",
2779 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2780 IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2785 ASSERT(q == END_BQ_QUEUE ||
2786 get_itbl(q)->type == TSO ||
2787 get_itbl(q)->type == BLOCKED_FETCH ||
2788 get_itbl(q)->type == CONSTR);
2791 while (get_itbl(bqe)->type==TSO ||
2792 get_itbl(bqe)->type==BLOCKED_FETCH) {
2793 bqe = unblockOneLocked(bqe, node);
2795 RELEASE_LOCK(&sched_mutex);
2798 #else /* !GRAN && !PAR */
2800 awakenBlockedQueue(StgTSO *tso)
2802 ACQUIRE_LOCK(&sched_mutex);
2803 while (tso != END_TSO_QUEUE) {
2804 tso = unblockOneLocked(tso);
2806 RELEASE_LOCK(&sched_mutex);
2810 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2811 //@subsection Exception Handling Routines
2813 /* ---------------------------------------------------------------------------
2815 - usually called inside a signal handler so it mustn't do anything fancy.
2816 ------------------------------------------------------------------------ */
2819 interruptStgRts(void)
2825 /* -----------------------------------------------------------------------------
2828 This is for use when we raise an exception in another thread, which
2830 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2831 -------------------------------------------------------------------------- */
2833 #if defined(GRAN) || defined(PAR)
2835 NB: only the type of the blocking queue is different in GranSim and GUM
2836 the operations on the queue-elements are the same
2837 long live polymorphism!
2839 Locks: sched_mutex is held upon entry and exit.
2843 unblockThread(StgTSO *tso)
2845 StgBlockingQueueElement *t, **last;
2847 ACQUIRE_LOCK(&sched_mutex);
2848 switch (tso->why_blocked) {
2851 return; /* not blocked */
2854 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2856 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2857 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2859 last = (StgBlockingQueueElement **)&mvar->head;
2860 for (t = (StgBlockingQueueElement *)mvar->head;
2862 last = &t->link, last_tso = t, t = t->link) {
2863 if (t == (StgBlockingQueueElement *)tso) {
2864 *last = (StgBlockingQueueElement *)tso->link;
2865 if (mvar->tail == tso) {
2866 mvar->tail = (StgTSO *)last_tso;
2871 barf("unblockThread (MVAR): TSO not found");
2874 case BlockedOnBlackHole:
2875 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2877 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2879 last = &bq->blocking_queue;
2880 for (t = bq->blocking_queue;
2882 last = &t->link, t = t->link) {
2883 if (t == (StgBlockingQueueElement *)tso) {
2884 *last = (StgBlockingQueueElement *)tso->link;
2888 barf("unblockThread (BLACKHOLE): TSO not found");
2891 case BlockedOnException:
2893 StgTSO *target = tso->block_info.tso;
2895 ASSERT(get_itbl(target)->type == TSO);
2897 if (target->what_next == ThreadRelocated) {
2898 target = target->link;
2899 ASSERT(get_itbl(target)->type == TSO);
2902 ASSERT(target->blocked_exceptions != NULL);
2904 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2905 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2907 last = &t->link, t = t->link) {
2908 ASSERT(get_itbl(t)->type == TSO);
2909 if (t == (StgBlockingQueueElement *)tso) {
2910 *last = (StgBlockingQueueElement *)tso->link;
2914 barf("unblockThread (Exception): TSO not found");
2918 case BlockedOnWrite:
2920 /* take TSO off blocked_queue */
2921 StgBlockingQueueElement *prev = NULL;
2922 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2923 prev = t, t = t->link) {
2924 if (t == (StgBlockingQueueElement *)tso) {
2926 blocked_queue_hd = (StgTSO *)t->link;
2927 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2928 blocked_queue_tl = END_TSO_QUEUE;
2931 prev->link = t->link;
2932 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2933 blocked_queue_tl = (StgTSO *)prev;
2939 barf("unblockThread (I/O): TSO not found");
2942 case BlockedOnDelay:
2944 /* take TSO off sleeping_queue */
2945 StgBlockingQueueElement *prev = NULL;
2946 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2947 prev = t, t = t->link) {
2948 if (t == (StgBlockingQueueElement *)tso) {
2950 sleeping_queue = (StgTSO *)t->link;
2952 prev->link = t->link;
2957 barf("unblockThread (I/O): TSO not found");
2961 barf("unblockThread");
2965 tso->link = END_TSO_QUEUE;
2966 tso->why_blocked = NotBlocked;
2967 tso->block_info.closure = NULL;
2968 PUSH_ON_RUN_QUEUE(tso);
2969 RELEASE_LOCK(&sched_mutex);
2973 unblockThread(StgTSO *tso)
2977 /* To avoid locking unnecessarily. */
2978 if (tso->why_blocked == NotBlocked) {
2982 ACQUIRE_LOCK(&sched_mutex);
2983 switch (tso->why_blocked) {
2986 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2988 StgTSO *last_tso = END_TSO_QUEUE;
2989 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2992 for (t = mvar->head; t != END_TSO_QUEUE;
2993 last = &t->link, last_tso = t, t = t->link) {
2996 if (mvar->tail == tso) {
2997 mvar->tail = last_tso;
3002 barf("unblockThread (MVAR): TSO not found");
3005 case BlockedOnBlackHole:
3006 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3008 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3010 last = &bq->blocking_queue;
3011 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
3012 last = &t->link, t = t->link) {
3018 barf("unblockThread (BLACKHOLE): TSO not found");
3021 case BlockedOnException:
3023 StgTSO *target = tso->block_info.tso;
3025 ASSERT(get_itbl(target)->type == TSO);
3027 while (target->what_next == ThreadRelocated) {
3028 target = target->link;
3029 ASSERT(get_itbl(target)->type == TSO);
3032 ASSERT(target->blocked_exceptions != NULL);
3034 last = &target->blocked_exceptions;
3035 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
3036 last = &t->link, t = t->link) {
3037 ASSERT(get_itbl(t)->type == TSO);
3043 barf("unblockThread (Exception): TSO not found");
3047 case BlockedOnWrite:
3049 StgTSO *prev = NULL;
3050 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
3051 prev = t, t = t->link) {
3054 blocked_queue_hd = t->link;
3055 if (blocked_queue_tl == t) {
3056 blocked_queue_tl = END_TSO_QUEUE;
3059 prev->link = t->link;
3060 if (blocked_queue_tl == t) {
3061 blocked_queue_tl = prev;
3067 barf("unblockThread (I/O): TSO not found");
3070 case BlockedOnDelay:
3072 StgTSO *prev = NULL;
3073 for (t = sleeping_queue; t != END_TSO_QUEUE;
3074 prev = t, t = t->link) {
3077 sleeping_queue = t->link;
3079 prev->link = t->link;
3084 barf("unblockThread (I/O): TSO not found");
3088 barf("unblockThread");
3092 tso->link = END_TSO_QUEUE;
3093 tso->why_blocked = NotBlocked;
3094 tso->block_info.closure = NULL;
3095 PUSH_ON_RUN_QUEUE(tso);
3096 RELEASE_LOCK(&sched_mutex);
3100 /* -----------------------------------------------------------------------------
3103 * The following function implements the magic for raising an
3104 * asynchronous exception in an existing thread.
3106 * We first remove the thread from any queue on which it might be
3107 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
3109 * We strip the stack down to the innermost CATCH_FRAME, building
3110 * thunks in the heap for all the active computations, so they can
3111 * be restarted if necessary. When we reach a CATCH_FRAME, we build
3112 * an application of the handler to the exception, and push it on
3113 * the top of the stack.
3115 * How exactly do we save all the active computations? We create an
3116 * AP_UPD for every UpdateFrame on the stack. Entering one of these
3117 * AP_UPDs pushes everything from the corresponding update frame
3118 * upwards onto the stack. (Actually, it pushes everything up to the
3119 * next update frame plus a pointer to the next AP_UPD object.
3120 * Entering the next AP_UPD object pushes more onto the stack until we
3121 * reach the last AP_UPD object - at which point the stack should look
3122 * exactly as it did when we killed the TSO and we can continue
3123 * execution by entering the closure on top of the stack.
3125 * We can also kill a thread entirely - this happens if either (a) the
3126 * exception passed to raiseAsync is NULL, or (b) there's no
3127 * CATCH_FRAME on the stack. In either case, we strip the entire
3128 * stack and replace the thread with a zombie.
3130 * Locks: sched_mutex not held upon entry nor exit.
3132 * -------------------------------------------------------------------------- */
3135 deleteThread(StgTSO *tso)
3137 raiseAsync(tso,NULL);
3141 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3143 /* When raising async exs from contexts where sched_mutex is held;
3144 use raiseAsyncWithLock(). */
3145 RELEASE_LOCK(&sched_mutex);
3146 raiseAsync(tso,exception);
3147 ACQUIRE_LOCK(&sched_mutex);
3151 raiseAsync(StgTSO *tso, StgClosure *exception)
3153 StgUpdateFrame* su = tso->su;
3154 StgPtr sp = tso->sp;
3156 /* Thread already dead? */
3157 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3161 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3163 /* Remove it from any blocking queues */
3166 IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3167 /* The stack freezing code assumes there's a closure pointer on
3168 * the top of the stack. This isn't always the case with compiled
3169 * code, so we have to push a dummy closure on the top which just
3170 * returns to the next return address on the stack.
3172 if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3173 *(--sp) = (W_)&stg_dummy_ret_closure;
3177 nat words = ((P_)su - (P_)sp) - 1;
3181 /* If we find a CATCH_FRAME, and we've got an exception to raise,
3182 * then build the THUNK raise(exception), and leave it on
3183 * top of the CATCH_FRAME ready to enter.
3185 if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3187 StgCatchFrame *cf = (StgCatchFrame *)su;
3191 /* we've got an exception to raise, so let's pass it to the
3192 * handler in this frame.
3194 raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3195 TICK_ALLOC_SE_THK(1,0);
3196 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3197 raise->payload[0] = exception;
3199 /* throw away the stack from Sp up to the CATCH_FRAME.
3203 /* Ensure that async excpetions are blocked now, so we don't get
3204 * a surprise exception before we get around to executing the
3207 if (tso->blocked_exceptions == NULL) {
3208 tso->blocked_exceptions = END_TSO_QUEUE;
3211 /* Put the newly-built THUNK on top of the stack, ready to execute
3212 * when the thread restarts.
3217 tso->what_next = ThreadEnterGHC;
3218 IF_DEBUG(sanity, checkTSO(tso));
3222 /* First build an AP_UPD consisting of the stack chunk above the
3223 * current update frame, with the top word on the stack as the
3226 ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3231 ap->fun = (StgClosure *)sp[0];
3233 for(i=0; i < (nat)words; ++i) {
3234 ap->payload[i] = (StgClosure *)*sp++;
3237 switch (get_itbl(su)->type) {
3241 SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */);
3242 TICK_ALLOC_UP_THK(words+1,0);
3245 fprintf(stderr, "scheduler: Updating ");
3246 printPtr((P_)su->updatee);
3247 fprintf(stderr, " with ");
3248 printObj((StgClosure *)ap);
3251 /* Replace the updatee with an indirection - happily
3252 * this will also wake up any threads currently
3253 * waiting on the result.
3255 * Warning: if we're in a loop, more than one update frame on
3256 * the stack may point to the same object. Be careful not to
3257 * overwrite an IND_OLDGEN in this case, because we'll screw
3258 * up the mutable lists. To be on the safe side, don't
3259 * overwrite any kind of indirection at all. See also
3260 * threadSqueezeStack in GC.c, where we have to make a similar
3263 if (!closure_IND(su->updatee)) {
3264 UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */
3267 sp += sizeofW(StgUpdateFrame) -1;
3268 sp[0] = (W_)ap; /* push onto stack */
3274 StgCatchFrame *cf = (StgCatchFrame *)su;
3277 /* We want a PAP, not an AP_UPD. Fortunately, the
3278 * layout's the same.
3280 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3281 TICK_ALLOC_UPD_PAP(words+1,0);
3283 /* now build o = FUN(catch,ap,handler) */
3284 o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3285 TICK_ALLOC_FUN(2,0);
3286 SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3287 o->payload[0] = (StgClosure *)ap;
3288 o->payload[1] = cf->handler;
3291 fprintf(stderr, "scheduler: Built ");
3292 printObj((StgClosure *)o);
3295 /* pop the old handler and put o on the stack */
3297 sp += sizeofW(StgCatchFrame) - 1;
3304 StgSeqFrame *sf = (StgSeqFrame *)su;
3307 SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3308 TICK_ALLOC_UPD_PAP(words+1,0);
3310 /* now build o = FUN(seq,ap) */
3311 o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3312 TICK_ALLOC_SE_THK(1,0);
3313 SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3314 o->payload[0] = (StgClosure *)ap;
3317 fprintf(stderr, "scheduler: Built ");
3318 printObj((StgClosure *)o);
3321 /* pop the old handler and put o on the stack */
3323 sp += sizeofW(StgSeqFrame) - 1;
3329 /* We've stripped the entire stack, the thread is now dead. */
3330 sp += sizeofW(StgStopFrame) - 1;
3331 sp[0] = (W_)exception; /* save the exception */
3332 tso->what_next = ThreadKilled;
3333 tso->su = (StgUpdateFrame *)(sp+1);
3344 /* -----------------------------------------------------------------------------
3345 resurrectThreads is called after garbage collection on the list of
3346 threads found to be garbage. Each of these threads will be woken
3347 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3348 on an MVar, or NonTermination if the thread was blocked on a Black
3351 Locks: sched_mutex isn't held upon entry nor exit.
3352 -------------------------------------------------------------------------- */
3355 resurrectThreads( StgTSO *threads )
3359 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3360 next = tso->global_link;
3361 tso->global_link = all_threads;
3363 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3365 switch (tso->why_blocked) {
3367 case BlockedOnException:
3368 /* Called by GC - sched_mutex lock is currently held. */
3369 raiseAsyncWithLock(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3371 case BlockedOnBlackHole:
3372 raiseAsyncWithLock(tso,(StgClosure *)NonTermination_closure);
3375 /* This might happen if the thread was blocked on a black hole
3376 * belonging to a thread that we've just woken up (raiseAsync
3377 * can wake up threads, remember...).
3381 barf("resurrectThreads: thread blocked in a strange way");
3386 /* -----------------------------------------------------------------------------
3387 * Blackhole detection: if we reach a deadlock, test whether any
3388 * threads are blocked on themselves. Any threads which are found to
3389 * be self-blocked get sent a NonTermination exception.
3391 * This is only done in a deadlock situation in order to avoid
3392 * performance overhead in the normal case.
3394 * Locks: sched_mutex is held upon entry and exit.
3395 * -------------------------------------------------------------------------- */
3398 detectBlackHoles( void )
3400 StgTSO *t = all_threads;
3401 StgUpdateFrame *frame;
3402 StgClosure *blocked_on;
3404 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3406 while (t->what_next == ThreadRelocated) {
3408 ASSERT(get_itbl(t)->type == TSO);
3411 if (t->why_blocked != BlockedOnBlackHole) {
3415 blocked_on = t->block_info.closure;
3417 for (frame = t->su; ; frame = frame->link) {
3418 switch (get_itbl(frame)->type) {
3421 if (frame->updatee == blocked_on) {
3422 /* We are blocking on one of our own computations, so
3423 * send this thread the NonTermination exception.
3426 sched_belch("thread %d is blocked on itself", t->id));
3427 raiseAsyncWithLock(t, (StgClosure *)NonTermination_closure);
3448 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3449 //@subsection Debugging Routines
3451 /* -----------------------------------------------------------------------------
3452 Debugging: why is a thread blocked
3453 -------------------------------------------------------------------------- */
3458 printThreadBlockage(StgTSO *tso)
3460 switch (tso->why_blocked) {
3462 fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3464 case BlockedOnWrite:
3465 fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3467 case BlockedOnDelay:
3468 fprintf(stderr,"is blocked until %d", tso->block_info.target);
3471 fprintf(stderr,"is blocked on an MVar");
3473 case BlockedOnException:
3474 fprintf(stderr,"is blocked on delivering an exception to thread %d",
3475 tso->block_info.tso->id);
3477 case BlockedOnBlackHole:
3478 fprintf(stderr,"is blocked on a black hole");
3481 fprintf(stderr,"is not blocked");
3485 fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3486 tso->block_info.closure, info_type(tso->block_info.closure));
3488 case BlockedOnGA_NoSend:
3489 fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3490 tso->block_info.closure, info_type(tso->block_info.closure));
3493 #if defined(RTS_SUPPORTS_THREADS)
3494 case BlockedOnCCall:
3495 fprintf(stderr,"is blocked on an external call");
3499 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3500 tso->why_blocked, tso->id, tso);
3505 printThreadStatus(StgTSO *tso)
3507 switch (tso->what_next) {
3509 fprintf(stderr,"has been killed");
3511 case ThreadComplete:
3512 fprintf(stderr,"has completed");
3515 printThreadBlockage(tso);
3520 printAllThreads(void)
3525 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3526 ullong_format_string(TIME_ON_PROC(CurrentProc),
3527 time_string, rtsFalse/*no commas!*/);
3529 sched_belch("all threads at [%s]:", time_string);
3531 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3532 ullong_format_string(CURRENT_TIME,
3533 time_string, rtsFalse/*no commas!*/);
3535 sched_belch("all threads at [%s]:", time_string);
3537 sched_belch("all threads:");
3540 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3541 fprintf(stderr, "\tthread %d ", t->id);
3542 if (t->label) fprintf(stderr,"[\"%s\"] ",t->label);
3543 printThreadStatus(t);
3544 fprintf(stderr,"\n");
3549 Print a whole blocking queue attached to node (debugging only).
3554 print_bq (StgClosure *node)
3556 StgBlockingQueueElement *bqe;
3560 fprintf(stderr,"## BQ of closure %p (%s): ",
3561 node, info_type(node));
3563 /* should cover all closures that may have a blocking queue */
3564 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3565 get_itbl(node)->type == FETCH_ME_BQ ||
3566 get_itbl(node)->type == RBH ||
3567 get_itbl(node)->type == MVAR);
3569 ASSERT(node!=(StgClosure*)NULL); // sanity check
3571 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3575 Print a whole blocking queue starting with the element bqe.
3578 print_bqe (StgBlockingQueueElement *bqe)
3583 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3585 for (end = (bqe==END_BQ_QUEUE);
3586 !end; // iterate until bqe points to a CONSTR
3587 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3588 bqe = end ? END_BQ_QUEUE : bqe->link) {
3589 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3590 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3591 /* types of closures that may appear in a blocking queue */
3592 ASSERT(get_itbl(bqe)->type == TSO ||
3593 get_itbl(bqe)->type == BLOCKED_FETCH ||
3594 get_itbl(bqe)->type == CONSTR);
3595 /* only BQs of an RBH end with an RBH_Save closure */
3596 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3598 switch (get_itbl(bqe)->type) {
3600 fprintf(stderr," TSO %u (%x),",
3601 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3604 fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3605 ((StgBlockedFetch *)bqe)->node,
3606 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3607 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3608 ((StgBlockedFetch *)bqe)->ga.weight);
3611 fprintf(stderr," %s (IP %p),",
3612 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3613 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3614 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3615 "RBH_Save_?"), get_itbl(bqe));
3618 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3619 info_type((StgClosure *)bqe)); // , node, info_type(node));
3623 fputc('\n', stderr);
3625 # elif defined(GRAN)
3627 print_bq (StgClosure *node)
3629 StgBlockingQueueElement *bqe;
3630 PEs node_loc, tso_loc;
3633 /* should cover all closures that may have a blocking queue */
3634 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3635 get_itbl(node)->type == FETCH_ME_BQ ||
3636 get_itbl(node)->type == RBH);
3638 ASSERT(node!=(StgClosure*)NULL); // sanity check
3639 node_loc = where_is(node);
3641 fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3642 node, info_type(node), node_loc);
3645 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3647 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3648 !end; // iterate until bqe points to a CONSTR
3649 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3650 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3651 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3652 /* types of closures that may appear in a blocking queue */
3653 ASSERT(get_itbl(bqe)->type == TSO ||
3654 get_itbl(bqe)->type == CONSTR);
3655 /* only BQs of an RBH end with an RBH_Save closure */
3656 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3658 tso_loc = where_is((StgClosure *)bqe);
3659 switch (get_itbl(bqe)->type) {
3661 fprintf(stderr," TSO %d (%p) on [PE %d],",
3662 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3665 fprintf(stderr," %s (IP %p),",
3666 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3667 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3668 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3669 "RBH_Save_?"), get_itbl(bqe));
3672 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3673 info_type((StgClosure *)bqe), node, info_type(node));
3677 fputc('\n', stderr);
3681 Nice and easy: only TSOs on the blocking queue
3684 print_bq (StgClosure *node)
3688 ASSERT(node!=(StgClosure*)NULL); // sanity check
3689 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3690 tso != END_TSO_QUEUE;
3692 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3693 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3694 fprintf(stderr," TSO %d (%p),", tso->id, tso);
3696 fputc('\n', stderr);
3707 for (i=0, tso=run_queue_hd;
3708 tso != END_TSO_QUEUE;
3717 sched_belch(char *s, ...)
3722 fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3724 fprintf(stderr, "== ");
3726 fprintf(stderr, "scheduler: ");
3728 vfprintf(stderr, s, ap);
3729 fprintf(stderr, "\n");
3735 //@node Index, , Debugging Routines, Main scheduling code
3739 //* StgMainThread:: @cindex\s-+StgMainThread
3740 //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
3741 //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
3742 //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl
3743 //* context_switch:: @cindex\s-+context_switch
3744 //* createThread:: @cindex\s-+createThread
3745 //* gc_pending_cond:: @cindex\s-+gc_pending_cond
3746 //* initScheduler:: @cindex\s-+initScheduler
3747 //* interrupted:: @cindex\s-+interrupted
3748 //* next_thread_id:: @cindex\s-+next_thread_id
3749 //* print_bq:: @cindex\s-+print_bq
3750 //* run_queue_hd:: @cindex\s-+run_queue_hd
3751 //* run_queue_tl:: @cindex\s-+run_queue_tl
3752 //* sched_mutex:: @cindex\s-+sched_mutex
3753 //* schedule:: @cindex\s-+schedule
3754 //* take_off_run_queue:: @cindex\s-+take_off_run_queue
3755 //* term_mutex:: @cindex\s-+term_mutex