1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2004
7 * Different GHC ways use this scheduler quite differently (see comments below)
8 * Here is the global picture:
10 * WAY Name CPP flag What's it for
11 * --------------------------------------
12 * mp GUM PAR Parallel execution on a distrib. memory machine
13 * s SMP SMP Parallel execution on a shared memory machine
14 * mg GranSim GRAN Simulation of parallel execution
15 * md GUM/GdH DIST Distributed execution (based on GUM)
17 * --------------------------------------------------------------------------*/
20 * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22 The main scheduling loop in GUM iterates until a finish message is received.
23 In that case a global flag @receivedFinish@ is set and this instance of
24 the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25 for the handling of incoming messages, such as PP_FINISH.
26 Note that in the parallel case we have a system manager that coordinates
27 different PEs, each of which are running one instance of the RTS.
28 See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29 From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31 * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33 The main scheduling code in GranSim is quite different from that in std
34 (concurrent) Haskell: while concurrent Haskell just iterates over the
35 threads in the runnable queue, GranSim is event driven, i.e. it iterates
36 over the events in the global event queue. -- HWL
39 #include "PosixSource.h"
44 #include "BlockAlloc.h"
48 #define COMPILING_SCHEDULER
50 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
65 #include "Proftimer.h"
68 #if defined(GRAN) || defined(PAR)
69 # include "GranSimRts.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
78 #include "Capability.h"
79 #include "OSThreads.h"
82 #ifdef HAVE_SYS_TYPES_H
83 #include <sys/types.h>
98 #define USED_IN_THREADED_RTS
100 #define USED_IN_THREADED_RTS STG_UNUSED
103 #ifdef RTS_SUPPORTS_THREADS
104 #define USED_WHEN_RTS_SUPPORTS_THREADS
106 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
109 /* Main thread queue.
110 * Locks required: sched_mutex.
112 StgMainThread *main_threads = NULL;
115 * Locks required: sched_mutex.
119 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
120 /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
123 In GranSim we have a runnable and a blocked queue for each processor.
124 In order to minimise code changes new arrays run_queue_hds/tls
125 are created. run_queue_hd is then a short cut (macro) for
126 run_queue_hds[CurrentProc] (see GranSim.h).
129 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
130 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
131 StgTSO *ccalling_threadss[MAX_PROC];
132 /* We use the same global list of threads (all_threads) in GranSim as in
133 the std RTS (i.e. we are cheating). However, we don't use this list in
134 the GranSim specific code at the moment (so we are only potentially
139 StgTSO *run_queue_hd = NULL;
140 StgTSO *run_queue_tl = NULL;
141 StgTSO *blocked_queue_hd = NULL;
142 StgTSO *blocked_queue_tl = NULL;
143 StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */
147 /* Linked list of all threads.
148 * Used for detecting garbage collected threads.
150 StgTSO *all_threads = NULL;
152 /* When a thread performs a safe C call (_ccall_GC, using old
153 * terminology), it gets put on the suspended_ccalling_threads
154 * list. Used by the garbage collector.
156 static StgTSO *suspended_ccalling_threads;
158 static StgTSO *threadStackOverflow(StgTSO *tso);
160 /* KH: The following two flags are shared memory locations. There is no need
161 to lock them, since they are only unset at the end of a scheduler
165 /* flag set by signal handler to precipitate a context switch */
166 int context_switch = 0;
168 /* if this flag is set as well, give up execution */
169 rtsBool interrupted = rtsFalse;
171 /* Next thread ID to allocate.
172 * Locks required: thread_id_mutex
174 static StgThreadID next_thread_id = 1;
177 * Pointers to the state of the current thread.
178 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
179 * thread. If CurrentTSO == NULL, then we're at the scheduler level.
182 /* The smallest stack size that makes any sense is:
183 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
184 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
185 * + 1 (the closure to enter)
187 * + 1 (spare slot req'd by stg_ap_v_ret)
189 * A thread with this stack will bomb immediately with a stack
190 * overflow, which will increase its stack size.
193 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
200 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
201 * exists - earlier gccs apparently didn't.
206 static rtsBool ready_to_gc;
209 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
210 * in an MT setting, needed to signal that a worker thread shouldn't hang around
211 * in the scheduler when it is out of work.
213 static rtsBool shutting_down_scheduler = rtsFalse;
215 void addToBlockedQueue ( StgTSO *tso );
217 static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
218 void interruptStgRts ( void );
220 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
221 static void detectBlackHoles ( void );
224 #if defined(RTS_SUPPORTS_THREADS)
225 /* ToDo: carefully document the invariants that go together
226 * with these synchronisation objects.
228 Mutex sched_mutex = INIT_MUTEX_VAR;
229 Mutex term_mutex = INIT_MUTEX_VAR;
231 #endif /* RTS_SUPPORTS_THREADS */
235 rtsTime TimeOfLastYield;
236 rtsBool emitSchedule = rtsTrue;
240 static char *whatNext_strs[] = {
251 StgTSO * createSparkThread(rtsSpark spark);
252 StgTSO * activateSpark (rtsSpark spark);
255 /* ----------------------------------------------------------------------------
257 * ------------------------------------------------------------------------- */
259 #if defined(RTS_SUPPORTS_THREADS)
260 static rtsBool startingWorkerThread = rtsFalse;
262 static void taskStart(void);
266 ACQUIRE_LOCK(&sched_mutex);
267 startingWorkerThread = rtsFalse;
269 RELEASE_LOCK(&sched_mutex);
273 startSchedulerTaskIfNecessary(void)
275 if(run_queue_hd != END_TSO_QUEUE
276 || blocked_queue_hd != END_TSO_QUEUE
277 || sleeping_queue != END_TSO_QUEUE)
279 if(!startingWorkerThread)
280 { // we don't want to start another worker thread
281 // just because the last one hasn't yet reached the
282 // "waiting for capability" state
283 startingWorkerThread = rtsTrue;
284 if(!startTask(taskStart))
286 startingWorkerThread = rtsFalse;
293 /* ---------------------------------------------------------------------------
294 Main scheduling loop.
296 We use round-robin scheduling, each thread returning to the
297 scheduler loop when one of these conditions is detected:
300 * timer expires (thread yields)
305 Locking notes: we acquire the scheduler lock once at the beginning
306 of the scheduler loop, and release it when
308 * running a thread, or
309 * waiting for work, or
310 * waiting for a GC to complete.
313 In a GranSim setup this loop iterates over the global event queue.
314 This revolves around the global event queue, which determines what
315 to do next. Therefore, it's more complicated than either the
316 concurrent or the parallel (GUM) setup.
319 GUM iterates over incoming messages.
320 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
321 and sends out a fish whenever it has nothing to do; in-between
322 doing the actual reductions (shared code below) it processes the
323 incoming messages and deals with delayed operations
324 (see PendingFetches).
325 This is not the ugliest code you could imagine, but it's bloody close.
327 ------------------------------------------------------------------------ */
329 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
330 Capability *initialCapability )
334 StgThreadReturnCode ret;
342 rtsBool receivedFinish = rtsFalse;
344 nat tp_size, sp_size; // stats only
347 rtsBool was_interrupted = rtsFalse;
350 // Pre-condition: sched_mutex is held.
351 // We might have a capability, passed in as initialCapability.
352 cap = initialCapability;
354 #if defined(RTS_SUPPORTS_THREADS)
356 // in the threaded case, the capability is either passed in via the
357 // initialCapability parameter, or initialized inside the scheduler
361 sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
362 mainThread, initialCapability);
365 // simply initialise it in the non-threaded case
366 grabCapability(&cap);
370 /* set up first event to get things going */
371 /* ToDo: assign costs for system setup and init MainTSO ! */
372 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
374 CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
377 debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
378 G_TSO(CurrentTSO, 5));
380 if (RtsFlags.GranFlags.Light) {
381 /* Save current time; GranSim Light only */
382 CurrentTSO->gran.clock = CurrentTime[CurrentProc];
385 event = get_next_event();
387 while (event!=(rtsEvent*)NULL) {
388 /* Choose the processor with the next event */
389 CurrentProc = event->proc;
390 CurrentTSO = event->tso;
394 while (!receivedFinish) { /* set by processMessages */
395 /* when receiving PP_FINISH message */
397 #else // everything except GRAN and PAR
403 IF_DEBUG(scheduler, printAllThreads());
405 #if defined(RTS_SUPPORTS_THREADS)
406 // Yield the capability to higher-priority tasks if necessary.
409 yieldCapability(&cap);
412 // If we do not currently hold a capability, we wait for one
415 waitForCapability(&sched_mutex, &cap,
416 mainThread ? &mainThread->bound_thread_cond : NULL);
419 // We now have a capability...
423 // If we're interrupted (the user pressed ^C, or some other
424 // termination condition occurred), kill all the currently running
428 IF_DEBUG(scheduler, sched_belch("interrupted"));
429 interrupted = rtsFalse;
430 was_interrupted = rtsTrue;
431 #if defined(RTS_SUPPORTS_THREADS)
432 // In the threaded RTS, deadlock detection doesn't work,
433 // so just exit right away.
434 errorBelch("interrupted");
435 releaseCapability(cap);
436 RELEASE_LOCK(&sched_mutex);
437 shutdownHaskellAndExit(EXIT_SUCCESS);
443 #if defined(RTS_USER_SIGNALS)
444 // check for signals each time around the scheduler
445 if (signals_pending()) {
446 RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
447 startSignalHandlers();
448 ACQUIRE_LOCK(&sched_mutex);
453 // Check whether any waiting threads need to be woken up. If the
454 // run queue is empty, and there are no other tasks running, we
455 // can wait indefinitely for something to happen.
457 if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
459 #if defined(RTS_SUPPORTS_THREADS)
460 // We shouldn't be here...
461 barf("schedule: awaitEvent() in threaded RTS");
463 awaitEvent( EMPTY_RUN_QUEUE() );
465 // we can be interrupted while waiting for I/O...
466 if (interrupted) continue;
469 * Detect deadlock: when we have no threads to run, there are no
470 * threads waiting on I/O or sleeping, and all the other tasks are
471 * waiting for work, we must have a deadlock of some description.
473 * We first try to find threads blocked on themselves (ie. black
474 * holes), and generate NonTermination exceptions where necessary.
476 * If no threads are black holed, we have a deadlock situation, so
477 * inform all the main threads.
479 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
480 if ( EMPTY_THREAD_QUEUES() )
482 IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
484 // Garbage collection can release some new threads due to
485 // either (a) finalizers or (b) threads resurrected because
486 // they are unreachable and will therefore be sent an
487 // exception. Any threads thus released will be immediately
489 GarbageCollect(GetRoots,rtsTrue);
490 if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
492 #if defined(RTS_USER_SIGNALS)
493 /* If we have user-installed signal handlers, then wait
494 * for signals to arrive rather then bombing out with a
497 if ( anyUserHandlers() ) {
499 sched_belch("still deadlocked, waiting for signals..."));
503 // we might be interrupted...
504 if (interrupted) { continue; }
506 if (signals_pending()) {
507 RELEASE_LOCK(&sched_mutex);
508 startSignalHandlers();
509 ACQUIRE_LOCK(&sched_mutex);
511 ASSERT(!EMPTY_RUN_QUEUE());
516 /* Probably a real deadlock. Send the current main thread the
517 * Deadlock exception (or in the SMP build, send *all* main
518 * threads the deadlock exception, since none of them can make
524 switch (m->tso->why_blocked) {
525 case BlockedOnBlackHole:
526 case BlockedOnException:
528 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
531 barf("deadlock: main thread blocked in a strange way");
537 #elif defined(RTS_SUPPORTS_THREADS)
538 // ToDo: add deadlock detection in threaded RTS
540 // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
543 #if defined(RTS_SUPPORTS_THREADS)
544 if ( EMPTY_RUN_QUEUE() ) {
545 continue; // nothing to do
550 if (RtsFlags.GranFlags.Light)
551 GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
553 /* adjust time based on time-stamp */
554 if (event->time > CurrentTime[CurrentProc] &&
555 event->evttype != ContinueThread)
556 CurrentTime[CurrentProc] = event->time;
558 /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
559 if (!RtsFlags.GranFlags.Light)
562 IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
564 /* main event dispatcher in GranSim */
565 switch (event->evttype) {
566 /* Should just be continuing execution */
568 IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
569 /* ToDo: check assertion
570 ASSERT(run_queue_hd != (StgTSO*)NULL &&
571 run_queue_hd != END_TSO_QUEUE);
573 /* Ignore ContinueThreads for fetching threads (if synchr comm) */
574 if (!RtsFlags.GranFlags.DoAsyncFetch &&
575 procStatus[CurrentProc]==Fetching) {
576 debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
577 CurrentTSO->id, CurrentTSO, CurrentProc);
580 /* Ignore ContinueThreads for completed threads */
581 if (CurrentTSO->what_next == ThreadComplete) {
582 debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n",
583 CurrentTSO->id, CurrentTSO, CurrentProc);
586 /* Ignore ContinueThreads for threads that are being migrated */
587 if (PROCS(CurrentTSO)==Nowhere) {
588 debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
589 CurrentTSO->id, CurrentTSO, CurrentProc);
592 /* The thread should be at the beginning of the run queue */
593 if (CurrentTSO!=run_queue_hds[CurrentProc]) {
594 debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
595 CurrentTSO->id, CurrentTSO, CurrentProc);
596 break; // run the thread anyway
599 new_event(proc, proc, CurrentTime[proc],
601 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
603 */ /* Catches superfluous CONTINUEs -- should be unnecessary */
604 break; // now actually run the thread; DaH Qu'vam yImuHbej
607 do_the_fetchnode(event);
608 goto next_thread; /* handle next event in event queue */
611 do_the_globalblock(event);
612 goto next_thread; /* handle next event in event queue */
615 do_the_fetchreply(event);
616 goto next_thread; /* handle next event in event queue */
618 case UnblockThread: /* Move from the blocked queue to the tail of */
619 do_the_unblock(event);
620 goto next_thread; /* handle next event in event queue */
622 case ResumeThread: /* Move from the blocked queue to the tail of */
623 /* the runnable queue ( i.e. Qu' SImqa'lu') */
624 event->tso->gran.blocktime +=
625 CurrentTime[CurrentProc] - event->tso->gran.blockedat;
626 do_the_startthread(event);
627 goto next_thread; /* handle next event in event queue */
630 do_the_startthread(event);
631 goto next_thread; /* handle next event in event queue */
634 do_the_movethread(event);
635 goto next_thread; /* handle next event in event queue */
638 do_the_movespark(event);
639 goto next_thread; /* handle next event in event queue */
642 do_the_findwork(event);
643 goto next_thread; /* handle next event in event queue */
646 barf("Illegal event type %u\n", event->evttype);
649 /* This point was scheduler_loop in the old RTS */
651 IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
653 TimeOfLastEvent = CurrentTime[CurrentProc];
654 TimeOfNextEvent = get_time_of_next_event();
655 IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
656 // CurrentTSO = ThreadQueueHd;
658 IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n",
661 if (RtsFlags.GranFlags.Light)
662 GranSimLight_leave_system(event, &ActiveTSO);
664 EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
667 debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
669 /* in a GranSim setup the TSO stays on the run queue */
671 /* Take a thread from the run queue. */
672 POP_RUN_QUEUE(t); // take_off_run_queue(t);
675 debugBelch("GRAN: About to run current thread, which is\n");
678 context_switch = 0; // turned on via GranYield, checking events and time slice
681 DumpGranEvent(GR_SCHEDULE, t));
683 procStatus[CurrentProc] = Busy;
686 if (PendingFetches != END_BF_QUEUE) {
690 /* ToDo: phps merge with spark activation above */
691 /* check whether we have local work and send requests if we have none */
692 if (EMPTY_RUN_QUEUE()) { /* no runnable threads */
693 /* :-[ no local threads => look out for local sparks */
694 /* the spark pool for the current PE */
695 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
696 if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
697 pool->hd < pool->tl) {
699 * ToDo: add GC code check that we really have enough heap afterwards!!
701 * If we're here (no runnable threads) and we have pending
702 * sparks, we must have a space problem. Get enough space
703 * to turn one of those pending sparks into a
707 spark = findSpark(rtsFalse); /* get a spark */
708 if (spark != (rtsSpark) NULL) {
709 tso = activateSpark(spark); /* turn the spark into a thread */
710 IF_PAR_DEBUG(schedule,
711 debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
712 tso->id, tso, advisory_thread_count));
714 if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
715 debugBelch("==^^ failed to activate spark\n");
717 } /* otherwise fall through & pick-up new tso */
719 IF_PAR_DEBUG(verbose,
720 debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n",
721 spark_queue_len(pool)));
726 /* If we still have no work we need to send a FISH to get a spark
729 if (EMPTY_RUN_QUEUE()) {
730 /* =8-[ no local sparks => look for work on other PEs */
732 * We really have absolutely no work. Send out a fish
733 * (there may be some out there already), and wait for
734 * something to arrive. We clearly can't run any threads
735 * until a SCHEDULE or RESUME arrives, and so that's what
736 * we're hoping to see. (Of course, we still have to
737 * respond to other types of messages.)
739 TIME now = msTime() /*CURRENT_TIME*/;
740 IF_PAR_DEBUG(verbose,
741 debugBelch("-- now=%ld\n", now));
742 IF_PAR_DEBUG(verbose,
743 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
744 (last_fish_arrived_at!=0 &&
745 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
746 debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
747 last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
748 last_fish_arrived_at,
749 RtsFlags.ParFlags.fishDelay, now);
752 if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
753 (last_fish_arrived_at==0 ||
754 (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
755 /* outstandingFishes is set in sendFish, processFish;
756 avoid flooding system with fishes via delay */
758 sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY,
761 // Global statistics: count no. of fishes
762 if (RtsFlags.ParFlags.ParStats.Global &&
763 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
764 globalParStats.tot_fish_mess++;
768 receivedFinish = processMessages();
771 } else if (PacketsWaiting()) { /* Look for incoming messages */
772 receivedFinish = processMessages();
775 /* Now we are sure that we have some work available */
776 ASSERT(run_queue_hd != END_TSO_QUEUE);
778 /* Take a thread from the run queue, if we have work */
779 POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE);
780 IF_DEBUG(sanity,checkTSO(t));
782 /* ToDo: write something to the log-file
783 if (RTSflags.ParFlags.granSimStats && !sameThread)
784 DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
788 /* the spark pool for the current PE */
789 pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
792 debugBelch("--=^ %d threads, %d sparks on [%#x]\n",
793 run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
796 if (0 && RtsFlags.ParFlags.ParStats.Full &&
797 t && LastTSO && t->id != LastTSO->id &&
798 LastTSO->why_blocked == NotBlocked &&
799 LastTSO->what_next != ThreadComplete) {
800 // if previously scheduled TSO not blocked we have to record the context switch
801 DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
802 GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
805 if (RtsFlags.ParFlags.ParStats.Full &&
806 (emitSchedule /* forced emit */ ||
807 (t && LastTSO && t->id != LastTSO->id))) {
809 we are running a different TSO, so write a schedule event to log file
810 NB: If we use fair scheduling we also have to write a deschedule
811 event for LastTSO; with unfair scheduling we know that the
812 previous tso has blocked whenever we switch to another tso, so
813 we don't need it in GUM for now
815 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
816 GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
817 emitSchedule = rtsFalse;
821 #else /* !GRAN && !PAR */
823 // grab a thread from the run queue
824 ASSERT(run_queue_hd != END_TSO_QUEUE);
827 // Sanity check the thread we're about to run. This can be
828 // expensive if there is lots of thread switching going on...
829 IF_DEBUG(sanity,checkTSO(t));
834 StgMainThread *m = t->main;
841 sched_belch("### Running thread %d in bound thread", t->id));
842 // yes, the Haskell thread is bound to the current native thread
847 sched_belch("### thread %d bound to another OS thread", t->id));
848 // no, bound to a different Haskell thread: pass to that thread
849 PUSH_ON_RUN_QUEUE(t);
850 passCapability(&m->bound_thread_cond);
856 if(mainThread != NULL)
857 // The thread we want to run is bound.
860 sched_belch("### this OS thread cannot run thread %d", t->id));
861 // no, the current native thread is bound to a different
862 // Haskell thread, so pass it to any worker thread
863 PUSH_ON_RUN_QUEUE(t);
864 passCapabilityToWorker();
871 cap->r.rCurrentTSO = t;
873 /* context switches are now initiated by the timer signal, unless
874 * the user specified "context switch as often as possible", with
877 if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
878 && (run_queue_hd != END_TSO_QUEUE
879 || blocked_queue_hd != END_TSO_QUEUE
880 || sleeping_queue != END_TSO_QUEUE)))
885 RELEASE_LOCK(&sched_mutex);
887 IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...",
888 (long)t->id, whatNext_strs[t->what_next]));
891 startHeapProfTimer();
894 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
895 /* Run the current thread
897 prev_what_next = t->what_next;
899 errno = t->saved_errno;
901 switch (prev_what_next) {
905 /* Thread already finished, return to scheduler. */
906 ret = ThreadFinished;
910 ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
913 case ThreadInterpret:
914 ret = interpretBCO(cap);
918 barf("schedule: invalid what_next field");
921 // The TSO might have moved, so find the new location:
922 t = cap->r.rCurrentTSO;
924 // And save the current errno in this thread.
925 t->saved_errno = errno;
927 /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
929 /* Costs for the scheduler are assigned to CCS_SYSTEM */
935 ACQUIRE_LOCK(&sched_mutex);
937 #ifdef RTS_SUPPORTS_THREADS
938 IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
939 #elif !defined(GRAN) && !defined(PAR)
940 IF_DEBUG(scheduler,debugBelch("sched: "););
944 /* HACK 675: if the last thread didn't yield, make sure to print a
945 SCHEDULE event to the log file when StgRunning the next thread, even
946 if it is the same one as before */
948 TimeOfLastYield = CURRENT_TIME;
954 IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
955 globalGranStats.tot_heapover++;
957 globalParStats.tot_heapover++;
960 // did the task ask for a large block?
961 if (cap->r.rHpAlloc > BLOCK_SIZE) {
962 // if so, get one and push it on the front of the nursery.
966 blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
968 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n",
969 (long)t->id, whatNext_strs[t->what_next], blocks));
971 // don't do this if it would push us over the
972 // alloc_blocks_lim limit; we'll GC first.
973 if (alloc_blocks + blocks < alloc_blocks_lim) {
975 alloc_blocks += blocks;
976 bd = allocGroup( blocks );
978 // link the new group into the list
979 bd->link = cap->r.rCurrentNursery;
980 bd->u.back = cap->r.rCurrentNursery->u.back;
981 if (cap->r.rCurrentNursery->u.back != NULL) {
982 cap->r.rCurrentNursery->u.back->link = bd;
984 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
985 g0s0->blocks == cap->r.rNursery);
986 cap->r.rNursery = g0s0->blocks = bd;
988 cap->r.rCurrentNursery->u.back = bd;
990 // initialise it as a nursery block. We initialise the
991 // step, gen_no, and flags field of *every* sub-block in
992 // this large block, because this is easier than making
993 // sure that we always find the block head of a large
994 // block whenever we call Bdescr() (eg. evacuate() and
995 // isAlive() in the GC would both have to do this, at
999 for (x = bd; x < bd + blocks; x++) {
1006 // don't forget to update the block count in g0s0.
1007 g0s0->n_blocks += blocks;
1008 // This assert can be a killer if the app is doing lots
1009 // of large block allocations.
1010 ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1012 // now update the nursery to point to the new block
1013 cap->r.rCurrentNursery = bd;
1015 // we might be unlucky and have another thread get on the
1016 // run queue before us and steal the large block, but in that
1017 // case the thread will just end up requesting another large
1019 PUSH_ON_RUN_QUEUE(t);
1024 /* make all the running tasks block on a condition variable,
1025 * maybe set context_switch and wait till they all pile in,
1026 * then have them wait on a GC condition variable.
1028 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
1029 (long)t->id, whatNext_strs[t->what_next]));
1032 ASSERT(!is_on_queue(t,CurrentProc));
1034 /* Currently we emit a DESCHEDULE event before GC in GUM.
1035 ToDo: either add separate event to distinguish SYSTEM time from rest
1036 or just nuke this DESCHEDULE (and the following SCHEDULE) */
1037 if (0 && RtsFlags.ParFlags.ParStats.Full) {
1038 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1039 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1040 emitSchedule = rtsTrue;
1044 ready_to_gc = rtsTrue;
1045 context_switch = 1; /* stop other threads ASAP */
1046 PUSH_ON_RUN_QUEUE(t);
1047 /* actual GC is done at the end of the while loop */
1053 DumpGranEvent(GR_DESCHEDULE, t));
1054 globalGranStats.tot_stackover++;
1057 // DumpGranEvent(GR_DESCHEDULE, t);
1058 globalParStats.tot_stackover++;
1060 IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n",
1061 (long)t->id, whatNext_strs[t->what_next]));
1062 /* just adjust the stack for this thread, then pop it back
1067 /* enlarge the stack */
1068 StgTSO *new_t = threadStackOverflow(t);
1070 /* This TSO has moved, so update any pointers to it from the
1071 * main thread stack. It better not be on any other queues...
1072 * (it shouldn't be).
1074 if (t->main != NULL) {
1075 t->main->tso = new_t;
1077 PUSH_ON_RUN_QUEUE(new_t);
1081 case ThreadYielding:
1082 // Reset the context switch flag. We don't do this just before
1083 // running the thread, because that would mean we would lose ticks
1084 // during GC, which can lead to unfair scheduling (a thread hogs
1085 // the CPU because the tick always arrives during GC). This way
1086 // penalises threads that do a lot of allocation, but that seems
1087 // better than the alternative.
1092 DumpGranEvent(GR_DESCHEDULE, t));
1093 globalGranStats.tot_yields++;
1096 // DumpGranEvent(GR_DESCHEDULE, t);
1097 globalParStats.tot_yields++;
1099 /* put the thread back on the run queue. Then, if we're ready to
1100 * GC, check whether this is the last task to stop. If so, wake
1101 * up the GC thread. getThread will block during a GC until the
1105 if (t->what_next != prev_what_next) {
1106 debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n",
1107 (long)t->id, whatNext_strs[t->what_next]);
1109 debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1110 (long)t->id, whatNext_strs[t->what_next]);
1115 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1117 ASSERT(t->link == END_TSO_QUEUE);
1119 // Shortcut if we're just switching evaluators: don't bother
1120 // doing stack squeezing (which can be expensive), just run the
1122 if (t->what_next != prev_what_next) {
1129 ASSERT(!is_on_queue(t,CurrentProc));
1132 //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1133 checkThreadQsSanity(rtsTrue));
1137 if (RtsFlags.ParFlags.doFairScheduling) {
1138 /* this does round-robin scheduling; good for concurrency */
1139 APPEND_TO_RUN_QUEUE(t);
1141 /* this does unfair scheduling; good for parallelism */
1142 PUSH_ON_RUN_QUEUE(t);
1145 // this does round-robin scheduling; good for concurrency
1146 APPEND_TO_RUN_QUEUE(t);
1150 /* add a ContinueThread event to actually process the thread */
1151 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1153 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1155 debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1164 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n",
1165 t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1166 if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1168 // ??? needed; should emit block before
1170 DumpGranEvent(GR_DESCHEDULE, t));
1171 prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1174 ASSERT(procStatus[CurrentProc]==Busy ||
1175 ((procStatus[CurrentProc]==Fetching) &&
1176 (t->block_info.closure!=(StgClosure*)NULL)));
1177 if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1178 !(!RtsFlags.GranFlags.DoAsyncFetch &&
1179 procStatus[CurrentProc]==Fetching))
1180 procStatus[CurrentProc] = Idle;
1184 debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
1185 t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1188 if (t->block_info.closure!=(StgClosure*)NULL)
1189 print_bq(t->block_info.closure));
1191 /* Send a fetch (if BlockedOnGA) and dump event to log file */
1194 /* whatever we schedule next, we must log that schedule */
1195 emitSchedule = rtsTrue;
1198 /* don't need to do anything. Either the thread is blocked on
1199 * I/O, in which case we'll have called addToBlockedQueue
1200 * previously, or it's blocked on an MVar or Blackhole, in which
1201 * case it'll be on the relevant queue already.
1204 debugBelch("--<< thread %d (%s) stopped: ",
1205 t->id, whatNext_strs[t->what_next]);
1206 printThreadBlockage(t);
1209 /* Only for dumping event to log file
1210 ToDo: do I need this in GranSim, too?
1217 case ThreadFinished:
1218 /* Need to check whether this was a main thread, and if so, signal
1219 * the task that started it with the return value. If we have no
1220 * more main threads, we probably need to stop all the tasks until
1223 /* We also end up here if the thread kills itself with an
1224 * uncaught exception, see Exception.hc.
1226 IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
1227 t->id, whatNext_strs[t->what_next]));
1229 endThread(t, CurrentProc); // clean-up the thread
1231 /* For now all are advisory -- HWL */
1232 //if(t->priority==AdvisoryPriority) ??
1233 advisory_thread_count--;
1236 if(t->dist.priority==RevalPriority)
1240 if (RtsFlags.ParFlags.ParStats.Full &&
1241 !RtsFlags.ParFlags.ParStats.Suppressed)
1242 DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1246 // Check whether the thread that just completed was a main
1247 // thread, and if so return with the result.
1249 // There is an assumption here that all thread completion goes
1250 // through this point; we need to make sure that if a thread
1251 // ends up in the ThreadKilled state, that it stays on the run
1252 // queue so it can be dealt with here.
1255 #if defined(RTS_SUPPORTS_THREADS)
1258 mainThread->tso == t
1262 // We are a bound thread: this must be our thread that just
1264 ASSERT(mainThread->tso == t);
1266 if (t->what_next == ThreadComplete) {
1267 if (mainThread->ret) {
1268 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1269 *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1];
1271 mainThread->stat = Success;
1273 if (mainThread->ret) {
1274 *(mainThread->ret) = NULL;
1276 if (was_interrupted) {
1277 mainThread->stat = Interrupted;
1279 mainThread->stat = Killed;
1283 removeThreadLabel((StgWord)mainThread->tso->id);
1285 if (mainThread->prev == NULL) {
1286 main_threads = mainThread->link;
1288 mainThread->prev->link = mainThread->link;
1290 if (mainThread->link != NULL) {
1291 mainThread->link->prev = NULL;
1293 releaseCapability(cap);
1297 #ifdef RTS_SUPPORTS_THREADS
1298 ASSERT(t->main == NULL);
1300 if (t->main != NULL) {
1301 // Must be a main thread that is not the topmost one. Leave
1302 // it on the run queue until the stack has unwound to the
1303 // point where we can deal with this. Leaving it on the run
1304 // queue also ensures that the garbage collector knows about
1305 // this thread and its return value (it gets dropped from the
1306 // all_threads list so there's no other way to find it).
1307 APPEND_TO_RUN_QUEUE(t);
1313 barf("schedule: invalid thread return code %d", (int)ret);
1317 // When we have +RTS -i0 and we're heap profiling, do a census at
1318 // every GC. This lets us get repeatable runs for debugging.
1319 if (performHeapProfile ||
1320 (RtsFlags.ProfFlags.profileInterval==0 &&
1321 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1322 GarbageCollect(GetRoots, rtsTrue);
1324 performHeapProfile = rtsFalse;
1325 ready_to_gc = rtsFalse; // we already GC'd
1330 /* Kick any transactions which are invalid back to their atomically frames.
1331 * When next scheduled they will try to commit, this commit will fail and
1332 * they will retry. */
1333 for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1334 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1335 if (!stmValidateTransaction (t -> trec)) {
1336 StgRetInfoTable *info;
1337 StgPtr sp = t -> sp;
1339 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1341 if (sp[0] == (W_)&stg_enter_info) {
1345 sp[0] = (W_)&stg_dummy_ret_closure;
1348 // Look up the stack for its atomically frame
1351 info = get_ret_itbl((StgClosure *)frame);
1353 while (info->i.type != ATOMICALLY_FRAME &&
1354 info->i.type != STOP_FRAME &&
1355 info->i.type != UPDATE_FRAME) {
1356 if (info -> i.type == CATCH_RETRY_FRAME) {
1357 IF_DEBUG(stm, sched_belch("Aborting transaction in catch-retry frame"));
1358 stmAbortTransaction(t -> trec);
1359 t -> trec = stmGetEnclosingTRec(t -> trec);
1361 frame += stack_frame_sizeW((StgClosure *)frame);
1362 info = get_ret_itbl((StgClosure *)frame);
1365 if (!info -> i.type == ATOMICALLY_FRAME) {
1366 barf("Could not find ATOMICALLY_FRAME for unvalidatable thread");
1369 // Cause the thread to enter its atomically frame again when
1370 // scheduled -- this will attempt stmCommitTransaction or stmReWait
1371 // which will fail triggering re-rexecution.
1373 t->what_next = ThreadRunGHC;
1378 /* everybody back, start the GC.
1379 * Could do it in this thread, or signal a condition var
1380 * to do it in another thread. Either way, we need to
1381 * broadcast on gc_pending_cond afterward.
1383 #if defined(RTS_SUPPORTS_THREADS)
1384 IF_DEBUG(scheduler,sched_belch("doing GC"));
1386 GarbageCollect(GetRoots,rtsFalse);
1387 ready_to_gc = rtsFalse;
1389 /* add a ContinueThread event to continue execution of current thread */
1390 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1392 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1394 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1402 IF_GRAN_DEBUG(unused,
1403 print_eventq(EventHd));
1405 event = get_next_event();
1408 /* ToDo: wait for next message to arrive rather than busy wait */
1411 } /* end of while(1) */
1413 IF_PAR_DEBUG(verbose,
1414 debugBelch("== Leaving schedule() after having received Finish\n"));
1417 /* ---------------------------------------------------------------------------
1418 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1419 * used by Control.Concurrent for error checking.
1420 * ------------------------------------------------------------------------- */
1423 rtsSupportsBoundThreads(void)
1432 /* ---------------------------------------------------------------------------
1433 * isThreadBound(tso): check whether tso is bound to an OS thread.
1434 * ------------------------------------------------------------------------- */
1437 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1440 return (tso->main != NULL);
1445 /* ---------------------------------------------------------------------------
1446 * Singleton fork(). Do not copy any running threads.
1447 * ------------------------------------------------------------------------- */
1449 #ifndef mingw32_TARGET_OS
1450 #define FORKPROCESS_PRIMOP_SUPPORTED
1453 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1455 deleteThreadImmediately(StgTSO *tso);
1458 forkProcess(HsStablePtr *entry
1459 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1464 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1470 IF_DEBUG(scheduler,sched_belch("forking!"));
1471 rts_lock(); // This not only acquires sched_mutex, it also
1472 // makes sure that no other threads are running
1476 if (pid) { /* parent */
1478 /* just return the pid */
1482 } else { /* child */
1485 // delete all threads
1486 run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1488 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1491 // don't allow threads to catch the ThreadKilled exception
1492 deleteThreadImmediately(t);
1495 // wipe the main thread list
1496 while((m = main_threads) != NULL) {
1497 main_threads = m->link;
1498 # ifdef THREADED_RTS
1499 closeCondition(&m->bound_thread_cond);
1504 rc = rts_evalStableIO(entry, NULL); // run the action
1505 rts_checkSchedStatus("forkProcess",rc);
1509 hs_exit(); // clean up and exit
1512 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1513 barf("forkProcess#: primop not supported, sorry!\n");
1518 /* ---------------------------------------------------------------------------
1519 * deleteAllThreads(): kill all the live threads.
1521 * This is used when we catch a user interrupt (^C), before performing
1522 * any necessary cleanups and running finalizers.
1524 * Locks: sched_mutex held.
1525 * ------------------------------------------------------------------------- */
1528 deleteAllThreads ( void )
1531 IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1532 for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1533 next = t->global_link;
1537 // The run queue now contains a bunch of ThreadKilled threads. We
1538 // must not throw these away: the main thread(s) will be in there
1539 // somewhere, and the main scheduler loop has to deal with it.
1540 // Also, the run queue is the only thing keeping these threads from
1541 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1543 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1544 ASSERT(sleeping_queue == END_TSO_QUEUE);
1547 /* startThread and insertThread are now in GranSim.c -- HWL */
1550 /* ---------------------------------------------------------------------------
1551 * Suspending & resuming Haskell threads.
1553 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1554 * its capability before calling the C function. This allows another
1555 * task to pick up the capability and carry on running Haskell
1556 * threads. It also means that if the C call blocks, it won't lock
1559 * The Haskell thread making the C call is put to sleep for the
1560 * duration of the call, on the susepended_ccalling_threads queue. We
1561 * give out a token to the task, which it can use to resume the thread
1562 * on return from the C function.
1563 * ------------------------------------------------------------------------- */
1566 suspendThread( StgRegTable *reg )
1570 int saved_errno = errno;
1572 /* assume that *reg is a pointer to the StgRegTable part
1575 cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1577 ACQUIRE_LOCK(&sched_mutex);
1580 sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1582 // XXX this might not be necessary --SDM
1583 cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1585 threadPaused(cap->r.rCurrentTSO);
1586 cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1587 suspended_ccalling_threads = cap->r.rCurrentTSO;
1589 if(cap->r.rCurrentTSO->blocked_exceptions == NULL) {
1590 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1591 cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1593 cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1596 /* Use the thread ID as the token; it should be unique */
1597 tok = cap->r.rCurrentTSO->id;
1599 /* Hand back capability */
1600 releaseCapability(cap);
1602 #if defined(RTS_SUPPORTS_THREADS)
1603 /* Preparing to leave the RTS, so ensure there's a native thread/task
1604 waiting to take over.
1606 IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1609 RELEASE_LOCK(&sched_mutex);
1611 errno = saved_errno;
1616 resumeThread( StgInt tok )
1618 StgTSO *tso, **prev;
1620 int saved_errno = errno;
1622 #if defined(RTS_SUPPORTS_THREADS)
1623 /* Wait for permission to re-enter the RTS with the result. */
1624 ACQUIRE_LOCK(&sched_mutex);
1625 waitForReturnCapability(&sched_mutex, &cap);
1627 IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1629 grabCapability(&cap);
1632 /* Remove the thread off of the suspended list */
1633 prev = &suspended_ccalling_threads;
1634 for (tso = suspended_ccalling_threads;
1635 tso != END_TSO_QUEUE;
1636 prev = &tso->link, tso = tso->link) {
1637 if (tso->id == (StgThreadID)tok) {
1642 if (tso == END_TSO_QUEUE) {
1643 barf("resumeThread: thread not found");
1645 tso->link = END_TSO_QUEUE;
1647 if(tso->why_blocked == BlockedOnCCall) {
1648 awakenBlockedQueueNoLock(tso->blocked_exceptions);
1649 tso->blocked_exceptions = NULL;
1652 /* Reset blocking status */
1653 tso->why_blocked = NotBlocked;
1655 cap->r.rCurrentTSO = tso;
1656 RELEASE_LOCK(&sched_mutex);
1657 errno = saved_errno;
1662 /* ---------------------------------------------------------------------------
1664 * ------------------------------------------------------------------------ */
1665 static void unblockThread(StgTSO *tso);
1667 /* ---------------------------------------------------------------------------
1668 * Comparing Thread ids.
1670 * This is used from STG land in the implementation of the
1671 * instances of Eq/Ord for ThreadIds.
1672 * ------------------------------------------------------------------------ */
1675 cmp_thread(StgPtr tso1, StgPtr tso2)
1677 StgThreadID id1 = ((StgTSO *)tso1)->id;
1678 StgThreadID id2 = ((StgTSO *)tso2)->id;
1680 if (id1 < id2) return (-1);
1681 if (id1 > id2) return 1;
1685 /* ---------------------------------------------------------------------------
1686 * Fetching the ThreadID from an StgTSO.
1688 * This is used in the implementation of Show for ThreadIds.
1689 * ------------------------------------------------------------------------ */
1691 rts_getThreadId(StgPtr tso)
1693 return ((StgTSO *)tso)->id;
1698 labelThread(StgPtr tso, char *label)
1703 /* Caveat: Once set, you can only set the thread name to "" */
1704 len = strlen(label)+1;
1705 buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1706 strncpy(buf,label,len);
1707 /* Update will free the old memory for us */
1708 updateThreadLabel(((StgTSO *)tso)->id,buf);
1712 /* ---------------------------------------------------------------------------
1713 Create a new thread.
1715 The new thread starts with the given stack size. Before the
1716 scheduler can run, however, this thread needs to have a closure
1717 (and possibly some arguments) pushed on its stack. See
1718 pushClosure() in Schedule.h.
1720 createGenThread() and createIOThread() (in SchedAPI.h) are
1721 convenient packaged versions of this function.
1723 currently pri (priority) is only used in a GRAN setup -- HWL
1724 ------------------------------------------------------------------------ */
1726 /* currently pri (priority) is only used in a GRAN setup -- HWL */
1728 createThread(nat size, StgInt pri)
1731 createThread(nat size)
1738 /* First check whether we should create a thread at all */
1740 /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1741 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1743 debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1744 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1745 return END_TSO_QUEUE;
1751 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1754 // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1756 /* catch ridiculously small stack sizes */
1757 if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1758 size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1761 stack_size = size - TSO_STRUCT_SIZEW;
1763 tso = (StgTSO *)allocate(size);
1764 TICK_ALLOC_TSO(stack_size, 0);
1766 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1768 SET_GRAN_HDR(tso, ThisPE);
1771 // Always start with the compiled code evaluator
1772 tso->what_next = ThreadRunGHC;
1774 tso->id = next_thread_id++;
1775 tso->why_blocked = NotBlocked;
1776 tso->blocked_exceptions = NULL;
1778 tso->saved_errno = 0;
1781 tso->stack_size = stack_size;
1782 tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
1784 tso->sp = (P_)&(tso->stack) + stack_size;
1786 tso->trec = NO_TREC;
1789 tso->prof.CCCS = CCS_MAIN;
1792 /* put a stop frame on the stack */
1793 tso->sp -= sizeofW(StgStopFrame);
1794 SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1795 tso->link = END_TSO_QUEUE;
1799 /* uses more flexible routine in GranSim */
1800 insertThread(tso, CurrentProc);
1802 /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1808 if (RtsFlags.GranFlags.GranSimStats.Full)
1809 DumpGranEvent(GR_START,tso);
1811 if (RtsFlags.ParFlags.ParStats.Full)
1812 DumpGranEvent(GR_STARTQ,tso);
1813 /* HACk to avoid SCHEDULE
1817 /* Link the new thread on the global thread list.
1819 tso->global_link = all_threads;
1823 tso->dist.priority = MandatoryPriority; //by default that is...
1827 tso->gran.pri = pri;
1829 tso->gran.magic = TSO_MAGIC; // debugging only
1831 tso->gran.sparkname = 0;
1832 tso->gran.startedat = CURRENT_TIME;
1833 tso->gran.exported = 0;
1834 tso->gran.basicblocks = 0;
1835 tso->gran.allocs = 0;
1836 tso->gran.exectime = 0;
1837 tso->gran.fetchtime = 0;
1838 tso->gran.fetchcount = 0;
1839 tso->gran.blocktime = 0;
1840 tso->gran.blockcount = 0;
1841 tso->gran.blockedat = 0;
1842 tso->gran.globalsparks = 0;
1843 tso->gran.localsparks = 0;
1844 if (RtsFlags.GranFlags.Light)
1845 tso->gran.clock = Now; /* local clock */
1847 tso->gran.clock = 0;
1849 IF_DEBUG(gran,printTSO(tso));
1852 tso->par.magic = TSO_MAGIC; // debugging only
1854 tso->par.sparkname = 0;
1855 tso->par.startedat = CURRENT_TIME;
1856 tso->par.exported = 0;
1857 tso->par.basicblocks = 0;
1858 tso->par.allocs = 0;
1859 tso->par.exectime = 0;
1860 tso->par.fetchtime = 0;
1861 tso->par.fetchcount = 0;
1862 tso->par.blocktime = 0;
1863 tso->par.blockcount = 0;
1864 tso->par.blockedat = 0;
1865 tso->par.globalsparks = 0;
1866 tso->par.localsparks = 0;
1870 globalGranStats.tot_threads_created++;
1871 globalGranStats.threads_created_on_PE[CurrentProc]++;
1872 globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1873 globalGranStats.tot_sq_probes++;
1875 // collect parallel global statistics (currently done together with GC stats)
1876 if (RtsFlags.ParFlags.ParStats.Global &&
1877 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1878 //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime());
1879 globalParStats.tot_threads_created++;
1885 sched_belch("==__ schedule: Created TSO %d (%p);",
1886 CurrentProc, tso, tso->id));
1888 IF_PAR_DEBUG(verbose,
1889 sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1890 (long)tso->id, tso, advisory_thread_count));
1892 IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words",
1893 (long)tso->id, (long)tso->stack_size));
1900 all parallel thread creation calls should fall through the following routine.
1903 createSparkThread(rtsSpark spark)
1905 ASSERT(spark != (rtsSpark)NULL);
1906 if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads)
1908 barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1909 RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1910 return END_TSO_QUEUE;
1914 tso = createThread(RtsFlags.GcFlags.initialStkSize);
1915 if (tso==END_TSO_QUEUE)
1916 barf("createSparkThread: Cannot create TSO");
1918 tso->priority = AdvisoryPriority;
1920 pushClosure(tso,spark);
1921 PUSH_ON_RUN_QUEUE(tso);
1922 advisory_thread_count++;
1929 Turn a spark into a thread.
1930 ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1934 activateSpark (rtsSpark spark)
1938 tso = createSparkThread(spark);
1939 if (RtsFlags.ParFlags.ParStats.Full) {
1940 //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1941 IF_PAR_DEBUG(verbose,
1942 debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1943 (StgClosure *)spark, info_type((StgClosure *)spark)));
1945 // ToDo: fwd info on local/global spark to thread -- HWL
1946 // tso->gran.exported = spark->exported;
1947 // tso->gran.locked = !spark->global;
1948 // tso->gran.sparkname = spark->name;
1954 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1955 Capability *initialCapability
1959 /* ---------------------------------------------------------------------------
1962 * scheduleThread puts a thread on the head of the runnable queue.
1963 * This will usually be done immediately after a thread is created.
1964 * The caller of scheduleThread must create the thread using e.g.
1965 * createThread and push an appropriate closure
1966 * on this thread's stack before the scheduler is invoked.
1967 * ------------------------------------------------------------------------ */
1969 static void scheduleThread_ (StgTSO* tso);
1972 scheduleThread_(StgTSO *tso)
1974 // The thread goes at the *end* of the run-queue, to avoid possible
1975 // starvation of any threads already on the queue.
1976 APPEND_TO_RUN_QUEUE(tso);
1981 scheduleThread(StgTSO* tso)
1983 ACQUIRE_LOCK(&sched_mutex);
1984 scheduleThread_(tso);
1985 RELEASE_LOCK(&sched_mutex);
1988 #if defined(RTS_SUPPORTS_THREADS)
1989 static Condition bound_cond_cache;
1990 static int bound_cond_cache_full = 0;
1995 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1996 Capability *initialCapability)
1998 // Precondition: sched_mutex must be held
2001 m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2006 m->link = main_threads;
2008 if (main_threads != NULL) {
2009 main_threads->prev = m;
2013 #if defined(RTS_SUPPORTS_THREADS)
2014 // Allocating a new condition for each thread is expensive, so we
2015 // cache one. This is a pretty feeble hack, but it helps speed up
2016 // consecutive call-ins quite a bit.
2017 if (bound_cond_cache_full) {
2018 m->bound_thread_cond = bound_cond_cache;
2019 bound_cond_cache_full = 0;
2021 initCondition(&m->bound_thread_cond);
2025 /* Put the thread on the main-threads list prior to scheduling the TSO.
2026 Failure to do so introduces a race condition in the MT case (as
2027 identified by Wolfgang Thaller), whereby the new task/OS thread
2028 created by scheduleThread_() would complete prior to the thread
2029 that spawned it managed to put 'itself' on the main-threads list.
2030 The upshot of it all being that the worker thread wouldn't get to
2031 signal the completion of the its work item for the main thread to
2032 see (==> it got stuck waiting.) -- sof 6/02.
2034 IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2036 APPEND_TO_RUN_QUEUE(tso);
2037 // NB. Don't call threadRunnable() here, because the thread is
2038 // bound and only runnable by *this* OS thread, so waking up other
2039 // workers will just slow things down.
2041 return waitThread_(m, initialCapability);
2044 /* ---------------------------------------------------------------------------
2047 * Initialise the scheduler. This resets all the queues - if the
2048 * queues contained any threads, they'll be garbage collected at the
2051 * ------------------------------------------------------------------------ */
2059 for (i=0; i<=MAX_PROC; i++) {
2060 run_queue_hds[i] = END_TSO_QUEUE;
2061 run_queue_tls[i] = END_TSO_QUEUE;
2062 blocked_queue_hds[i] = END_TSO_QUEUE;
2063 blocked_queue_tls[i] = END_TSO_QUEUE;
2064 ccalling_threadss[i] = END_TSO_QUEUE;
2065 sleeping_queue = END_TSO_QUEUE;
2068 run_queue_hd = END_TSO_QUEUE;
2069 run_queue_tl = END_TSO_QUEUE;
2070 blocked_queue_hd = END_TSO_QUEUE;
2071 blocked_queue_tl = END_TSO_QUEUE;
2072 sleeping_queue = END_TSO_QUEUE;
2075 suspended_ccalling_threads = END_TSO_QUEUE;
2077 main_threads = NULL;
2078 all_threads = END_TSO_QUEUE;
2083 RtsFlags.ConcFlags.ctxtSwitchTicks =
2084 RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2086 #if defined(RTS_SUPPORTS_THREADS)
2087 /* Initialise the mutex and condition variables used by
2089 initMutex(&sched_mutex);
2090 initMutex(&term_mutex);
2093 ACQUIRE_LOCK(&sched_mutex);
2095 /* A capability holds the state a native thread needs in
2096 * order to execute STG code. At least one capability is
2097 * floating around (only SMP builds have more than one).
2101 #if defined(RTS_SUPPORTS_THREADS)
2102 /* start our haskell execution tasks */
2103 startTaskManager(0,taskStart);
2106 #if /* defined(SMP) ||*/ defined(PAR)
2110 RELEASE_LOCK(&sched_mutex);
2114 exitScheduler( void )
2116 #if defined(RTS_SUPPORTS_THREADS)
2119 shutting_down_scheduler = rtsTrue;
2122 /* ----------------------------------------------------------------------------
2123 Managing the per-task allocation areas.
2125 Each capability comes with an allocation area. These are
2126 fixed-length block lists into which allocation can be done.
2128 ToDo: no support for two-space collection at the moment???
2129 ------------------------------------------------------------------------- */
2133 waitThread_(StgMainThread* m, Capability *initialCapability)
2135 SchedulerStatus stat;
2137 // Precondition: sched_mutex must be held.
2138 IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2141 /* GranSim specific init */
2142 CurrentTSO = m->tso; // the TSO to run
2143 procStatus[MainProc] = Busy; // status of main PE
2144 CurrentProc = MainProc; // PE to run it on
2145 schedule(m,initialCapability);
2147 schedule(m,initialCapability);
2148 ASSERT(m->stat != NoStatus);
2153 #if defined(RTS_SUPPORTS_THREADS)
2154 // Free the condition variable, returning it to the cache if possible.
2155 if (!bound_cond_cache_full) {
2156 bound_cond_cache = m->bound_thread_cond;
2157 bound_cond_cache_full = 1;
2159 closeCondition(&m->bound_thread_cond);
2163 IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2166 // Postcondition: sched_mutex still held
2170 /* ---------------------------------------------------------------------------
2171 Where are the roots that we know about?
2173 - all the threads on the runnable queue
2174 - all the threads on the blocked queue
2175 - all the threads on the sleeping queue
2176 - all the thread currently executing a _ccall_GC
2177 - all the "main threads"
2179 ------------------------------------------------------------------------ */
2181 /* This has to be protected either by the scheduler monitor, or by the
2182 garbage collection monitor (probably the latter).
2187 GetRoots( evac_fn evac )
2192 for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2193 if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2194 evac((StgClosure **)&run_queue_hds[i]);
2195 if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2196 evac((StgClosure **)&run_queue_tls[i]);
2198 if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2199 evac((StgClosure **)&blocked_queue_hds[i]);
2200 if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2201 evac((StgClosure **)&blocked_queue_tls[i]);
2202 if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2203 evac((StgClosure **)&ccalling_threads[i]);
2210 if (run_queue_hd != END_TSO_QUEUE) {
2211 ASSERT(run_queue_tl != END_TSO_QUEUE);
2212 evac((StgClosure **)&run_queue_hd);
2213 evac((StgClosure **)&run_queue_tl);
2216 if (blocked_queue_hd != END_TSO_QUEUE) {
2217 ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2218 evac((StgClosure **)&blocked_queue_hd);
2219 evac((StgClosure **)&blocked_queue_tl);
2222 if (sleeping_queue != END_TSO_QUEUE) {
2223 evac((StgClosure **)&sleeping_queue);
2227 if (suspended_ccalling_threads != END_TSO_QUEUE) {
2228 evac((StgClosure **)&suspended_ccalling_threads);
2231 #if defined(PAR) || defined(GRAN)
2232 markSparkQueue(evac);
2235 #if defined(RTS_USER_SIGNALS)
2236 // mark the signal handlers (signals should be already blocked)
2237 markSignalHandlers(evac);
2241 /* -----------------------------------------------------------------------------
2244 This is the interface to the garbage collector from Haskell land.
2245 We provide this so that external C code can allocate and garbage
2246 collect when called from Haskell via _ccall_GC.
2248 It might be useful to provide an interface whereby the programmer
2249 can specify more roots (ToDo).
2251 This needs to be protected by the GC condition variable above. KH.
2252 -------------------------------------------------------------------------- */
2254 static void (*extra_roots)(evac_fn);
2259 /* Obligated to hold this lock upon entry */
2260 ACQUIRE_LOCK(&sched_mutex);
2261 GarbageCollect(GetRoots,rtsFalse);
2262 RELEASE_LOCK(&sched_mutex);
2266 performMajorGC(void)
2268 ACQUIRE_LOCK(&sched_mutex);
2269 GarbageCollect(GetRoots,rtsTrue);
2270 RELEASE_LOCK(&sched_mutex);
2274 AllRoots(evac_fn evac)
2276 GetRoots(evac); // the scheduler's roots
2277 extra_roots(evac); // the user's roots
2281 performGCWithRoots(void (*get_roots)(evac_fn))
2283 ACQUIRE_LOCK(&sched_mutex);
2284 extra_roots = get_roots;
2285 GarbageCollect(AllRoots,rtsFalse);
2286 RELEASE_LOCK(&sched_mutex);
2289 /* -----------------------------------------------------------------------------
2292 If the thread has reached its maximum stack size, then raise the
2293 StackOverflow exception in the offending thread. Otherwise
2294 relocate the TSO into a larger chunk of memory and adjust its stack
2296 -------------------------------------------------------------------------- */
2299 threadStackOverflow(StgTSO *tso)
2301 nat new_stack_size, new_tso_size, stack_words;
2305 IF_DEBUG(sanity,checkTSO(tso));
2306 if (tso->stack_size >= tso->max_stack_size) {
2309 debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2310 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2311 /* If we're debugging, just print out the top of the stack */
2312 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2315 /* Send this thread the StackOverflow exception */
2316 raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2320 /* Try to double the current stack size. If that takes us over the
2321 * maximum stack size for this thread, then use the maximum instead.
2322 * Finally round up so the TSO ends up as a whole number of blocks.
2324 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2325 new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2326 TSO_STRUCT_SIZE)/sizeof(W_);
2327 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2328 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2330 IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2332 dest = (StgTSO *)allocate(new_tso_size);
2333 TICK_ALLOC_TSO(new_stack_size,0);
2335 /* copy the TSO block and the old stack into the new area */
2336 memcpy(dest,tso,TSO_STRUCT_SIZE);
2337 stack_words = tso->stack + tso->stack_size - tso->sp;
2338 new_sp = (P_)dest + new_tso_size - stack_words;
2339 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2341 /* relocate the stack pointers... */
2343 dest->stack_size = new_stack_size;
2345 /* Mark the old TSO as relocated. We have to check for relocated
2346 * TSOs in the garbage collector and any primops that deal with TSOs.
2348 * It's important to set the sp value to just beyond the end
2349 * of the stack, so we don't attempt to scavenge any part of the
2352 tso->what_next = ThreadRelocated;
2354 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2355 tso->why_blocked = NotBlocked;
2356 dest->mut_link = NULL;
2358 IF_PAR_DEBUG(verbose,
2359 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2360 tso->id, tso, tso->stack_size);
2361 /* If we're debugging, just print out the top of the stack */
2362 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2365 IF_DEBUG(sanity,checkTSO(tso));
2367 IF_DEBUG(scheduler,printTSO(dest));
2373 /* ---------------------------------------------------------------------------
2374 Wake up a queue that was blocked on some resource.
2375 ------------------------------------------------------------------------ */
2379 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2384 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2386 /* write RESUME events to log file and
2387 update blocked and fetch time (depending on type of the orig closure) */
2388 if (RtsFlags.ParFlags.ParStats.Full) {
2389 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
2390 GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2391 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2392 if (EMPTY_RUN_QUEUE())
2393 emitSchedule = rtsTrue;
2395 switch (get_itbl(node)->type) {
2397 ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2402 ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2409 barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2416 static StgBlockingQueueElement *
2417 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2420 PEs node_loc, tso_loc;
2422 node_loc = where_is(node); // should be lifted out of loop
2423 tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2424 tso_loc = where_is((StgClosure *)tso);
2425 if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2426 /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2427 ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2428 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2429 // insertThread(tso, node_loc);
2430 new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2432 tso, node, (rtsSpark*)NULL);
2433 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2436 } else { // TSO is remote (actually should be FMBQ)
2437 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2438 RtsFlags.GranFlags.Costs.gunblocktime +
2439 RtsFlags.GranFlags.Costs.latency;
2440 new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2442 tso, node, (rtsSpark*)NULL);
2443 tso->link = END_TSO_QUEUE; // overwrite link just to be sure
2446 /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2448 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2449 (node_loc==tso_loc ? "Local" : "Global"),
2450 tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2451 tso->block_info.closure = NULL;
2452 IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n",
2456 static StgBlockingQueueElement *
2457 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2459 StgBlockingQueueElement *next;
2461 switch (get_itbl(bqe)->type) {
2463 ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2464 /* if it's a TSO just push it onto the run_queue */
2466 ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2467 APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
2469 unblockCount(bqe, node);
2470 /* reset blocking status after dumping event */
2471 ((StgTSO *)bqe)->why_blocked = NotBlocked;
2475 /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2477 bqe->link = (StgBlockingQueueElement *)PendingFetches;
2478 PendingFetches = (StgBlockedFetch *)bqe;
2482 /* can ignore this case in a non-debugging setup;
2483 see comments on RBHSave closures above */
2485 /* check that the closure is an RBHSave closure */
2486 ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2487 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2488 get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2492 barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2493 get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
2497 IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2501 #else /* !GRAN && !PAR */
2503 unblockOneLocked(StgTSO *tso)
2507 ASSERT(get_itbl(tso)->type == TSO);
2508 ASSERT(tso->why_blocked != NotBlocked);
2509 tso->why_blocked = NotBlocked;
2511 tso->link = END_TSO_QUEUE;
2512 APPEND_TO_RUN_QUEUE(tso);
2514 IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2519 #if defined(GRAN) || defined(PAR)
2520 INLINE_ME StgBlockingQueueElement *
2521 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2523 ACQUIRE_LOCK(&sched_mutex);
2524 bqe = unblockOneLocked(bqe, node);
2525 RELEASE_LOCK(&sched_mutex);
2530 unblockOne(StgTSO *tso)
2532 ACQUIRE_LOCK(&sched_mutex);
2533 tso = unblockOneLocked(tso);
2534 RELEASE_LOCK(&sched_mutex);
2541 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2543 StgBlockingQueueElement *bqe;
2548 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2549 node, CurrentProc, CurrentTime[CurrentProc],
2550 CurrentTSO->id, CurrentTSO));
2552 node_loc = where_is(node);
2554 ASSERT(q == END_BQ_QUEUE ||
2555 get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
2556 get_itbl(q)->type == CONSTR); // closure (type constructor)
2557 ASSERT(is_unique(node));
2559 /* FAKE FETCH: magically copy the node to the tso's proc;
2560 no Fetch necessary because in reality the node should not have been
2561 moved to the other PE in the first place
2563 if (CurrentProc!=node_loc) {
2565 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2566 node, node_loc, CurrentProc, CurrentTSO->id,
2567 // CurrentTSO, where_is(CurrentTSO),
2568 node->header.gran.procs));
2569 node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2571 debugBelch("## new bitmask of node %p is %#x\n",
2572 node, node->header.gran.procs));
2573 if (RtsFlags.GranFlags.GranSimStats.Global) {
2574 globalGranStats.tot_fake_fetches++;
2579 // ToDo: check: ASSERT(CurrentProc==node_loc);
2580 while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2583 bqe points to the current element in the queue
2584 next points to the next element in the queue
2586 //tso = (StgTSO *)bqe; // wastes an assignment to get the type right
2587 //tso_loc = where_is(tso);
2589 bqe = unblockOneLocked(bqe, node);
2592 /* if this is the BQ of an RBH, we have to put back the info ripped out of
2593 the closure to make room for the anchor of the BQ */
2594 if (bqe!=END_BQ_QUEUE) {
2595 ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2597 ASSERT((info_ptr==&RBH_Save_0_info) ||
2598 (info_ptr==&RBH_Save_1_info) ||
2599 (info_ptr==&RBH_Save_2_info));
2601 /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2602 ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2603 ((StgRBH *)node)->mut_link = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2606 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2607 node, info_type(node)));
2610 /* statistics gathering */
2611 if (RtsFlags.GranFlags.GranSimStats.Global) {
2612 // globalGranStats.tot_bq_processing_time += bq_processing_time;
2613 globalGranStats.tot_bq_len += len; // total length of all bqs awakened
2614 // globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
2615 globalGranStats.tot_awbq++; // total no. of bqs awakened
2618 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2619 node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2623 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2625 StgBlockingQueueElement *bqe;
2627 ACQUIRE_LOCK(&sched_mutex);
2629 IF_PAR_DEBUG(verbose,
2630 debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2634 if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2635 IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2640 ASSERT(q == END_BQ_QUEUE ||
2641 get_itbl(q)->type == TSO ||
2642 get_itbl(q)->type == BLOCKED_FETCH ||
2643 get_itbl(q)->type == CONSTR);
2646 while (get_itbl(bqe)->type==TSO ||
2647 get_itbl(bqe)->type==BLOCKED_FETCH) {
2648 bqe = unblockOneLocked(bqe, node);
2650 RELEASE_LOCK(&sched_mutex);
2653 #else /* !GRAN && !PAR */
2656 awakenBlockedQueueNoLock(StgTSO *tso)
2658 while (tso != END_TSO_QUEUE) {
2659 tso = unblockOneLocked(tso);
2664 awakenBlockedQueue(StgTSO *tso)
2666 ACQUIRE_LOCK(&sched_mutex);
2667 while (tso != END_TSO_QUEUE) {
2668 tso = unblockOneLocked(tso);
2670 RELEASE_LOCK(&sched_mutex);
2674 /* ---------------------------------------------------------------------------
2676 - usually called inside a signal handler so it mustn't do anything fancy.
2677 ------------------------------------------------------------------------ */
2680 interruptStgRts(void)
2686 /* -----------------------------------------------------------------------------
2689 This is for use when we raise an exception in another thread, which
2691 This has nothing to do with the UnblockThread event in GranSim. -- HWL
2692 -------------------------------------------------------------------------- */
2694 #if defined(GRAN) || defined(PAR)
2696 NB: only the type of the blocking queue is different in GranSim and GUM
2697 the operations on the queue-elements are the same
2698 long live polymorphism!
2700 Locks: sched_mutex is held upon entry and exit.
2704 unblockThread(StgTSO *tso)
2706 StgBlockingQueueElement *t, **last;
2708 switch (tso->why_blocked) {
2711 return; /* not blocked */
2714 // Be careful: nothing to do here! We tell the scheduler that the thread
2715 // is runnable and we leave it to the stack-walking code to abort the
2716 // transaction while unwinding the stack. We should perhaps have a debugging
2717 // test to make sure that this really happens and that the 'zombie' transaction
2718 // does not get committed.
2722 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2724 StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2725 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2727 last = (StgBlockingQueueElement **)&mvar->head;
2728 for (t = (StgBlockingQueueElement *)mvar->head;
2730 last = &t->link, last_tso = t, t = t->link) {
2731 if (t == (StgBlockingQueueElement *)tso) {
2732 *last = (StgBlockingQueueElement *)tso->link;
2733 if (mvar->tail == tso) {
2734 mvar->tail = (StgTSO *)last_tso;
2739 barf("unblockThread (MVAR): TSO not found");
2742 case BlockedOnBlackHole:
2743 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2745 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2747 last = &bq->blocking_queue;
2748 for (t = bq->blocking_queue;
2750 last = &t->link, t = t->link) {
2751 if (t == (StgBlockingQueueElement *)tso) {
2752 *last = (StgBlockingQueueElement *)tso->link;
2756 barf("unblockThread (BLACKHOLE): TSO not found");
2759 case BlockedOnException:
2761 StgTSO *target = tso->block_info.tso;
2763 ASSERT(get_itbl(target)->type == TSO);
2765 if (target->what_next == ThreadRelocated) {
2766 target = target->link;
2767 ASSERT(get_itbl(target)->type == TSO);
2770 ASSERT(target->blocked_exceptions != NULL);
2772 last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2773 for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
2775 last = &t->link, t = t->link) {
2776 ASSERT(get_itbl(t)->type == TSO);
2777 if (t == (StgBlockingQueueElement *)tso) {
2778 *last = (StgBlockingQueueElement *)tso->link;
2782 barf("unblockThread (Exception): TSO not found");
2786 case BlockedOnWrite:
2787 #if defined(mingw32_TARGET_OS)
2788 case BlockedOnDoProc:
2791 /* take TSO off blocked_queue */
2792 StgBlockingQueueElement *prev = NULL;
2793 for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
2794 prev = t, t = t->link) {
2795 if (t == (StgBlockingQueueElement *)tso) {
2797 blocked_queue_hd = (StgTSO *)t->link;
2798 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2799 blocked_queue_tl = END_TSO_QUEUE;
2802 prev->link = t->link;
2803 if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2804 blocked_queue_tl = (StgTSO *)prev;
2810 barf("unblockThread (I/O): TSO not found");
2813 case BlockedOnDelay:
2815 /* take TSO off sleeping_queue */
2816 StgBlockingQueueElement *prev = NULL;
2817 for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
2818 prev = t, t = t->link) {
2819 if (t == (StgBlockingQueueElement *)tso) {
2821 sleeping_queue = (StgTSO *)t->link;
2823 prev->link = t->link;
2828 barf("unblockThread (delay): TSO not found");
2832 barf("unblockThread");
2836 tso->link = END_TSO_QUEUE;
2837 tso->why_blocked = NotBlocked;
2838 tso->block_info.closure = NULL;
2839 PUSH_ON_RUN_QUEUE(tso);
2843 unblockThread(StgTSO *tso)
2847 /* To avoid locking unnecessarily. */
2848 if (tso->why_blocked == NotBlocked) {
2852 switch (tso->why_blocked) {
2855 // Be careful: nothing to do here! We tell the scheduler that the thread
2856 // is runnable and we leave it to the stack-walking code to abort the
2857 // transaction while unwinding the stack. We should perhaps have a debugging
2858 // test to make sure that this really happens and that the 'zombie' transaction
2859 // does not get committed.
2863 ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2865 StgTSO *last_tso = END_TSO_QUEUE;
2866 StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2869 for (t = mvar->head; t != END_TSO_QUEUE;
2870 last = &t->link, last_tso = t, t = t->link) {
2873 if (mvar->tail == tso) {
2874 mvar->tail = last_tso;
2879 barf("unblockThread (MVAR): TSO not found");
2882 case BlockedOnBlackHole:
2883 ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2885 StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2887 last = &bq->blocking_queue;
2888 for (t = bq->blocking_queue; t != END_TSO_QUEUE;
2889 last = &t->link, t = t->link) {
2895 barf("unblockThread (BLACKHOLE): TSO not found");
2898 case BlockedOnException:
2900 StgTSO *target = tso->block_info.tso;
2902 ASSERT(get_itbl(target)->type == TSO);
2904 while (target->what_next == ThreadRelocated) {
2905 target = target->link;
2906 ASSERT(get_itbl(target)->type == TSO);
2909 ASSERT(target->blocked_exceptions != NULL);
2911 last = &target->blocked_exceptions;
2912 for (t = target->blocked_exceptions; t != END_TSO_QUEUE;
2913 last = &t->link, t = t->link) {
2914 ASSERT(get_itbl(t)->type == TSO);
2920 barf("unblockThread (Exception): TSO not found");
2924 case BlockedOnWrite:
2925 #if defined(mingw32_TARGET_OS)
2926 case BlockedOnDoProc:
2929 StgTSO *prev = NULL;
2930 for (t = blocked_queue_hd; t != END_TSO_QUEUE;
2931 prev = t, t = t->link) {
2934 blocked_queue_hd = t->link;
2935 if (blocked_queue_tl == t) {
2936 blocked_queue_tl = END_TSO_QUEUE;
2939 prev->link = t->link;
2940 if (blocked_queue_tl == t) {
2941 blocked_queue_tl = prev;
2947 barf("unblockThread (I/O): TSO not found");
2950 case BlockedOnDelay:
2952 StgTSO *prev = NULL;
2953 for (t = sleeping_queue; t != END_TSO_QUEUE;
2954 prev = t, t = t->link) {
2957 sleeping_queue = t->link;
2959 prev->link = t->link;
2964 barf("unblockThread (delay): TSO not found");
2968 barf("unblockThread");
2972 tso->link = END_TSO_QUEUE;
2973 tso->why_blocked = NotBlocked;
2974 tso->block_info.closure = NULL;
2975 APPEND_TO_RUN_QUEUE(tso);
2979 /* -----------------------------------------------------------------------------
2982 * The following function implements the magic for raising an
2983 * asynchronous exception in an existing thread.
2985 * We first remove the thread from any queue on which it might be
2986 * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
2988 * We strip the stack down to the innermost CATCH_FRAME, building
2989 * thunks in the heap for all the active computations, so they can
2990 * be restarted if necessary. When we reach a CATCH_FRAME, we build
2991 * an application of the handler to the exception, and push it on
2992 * the top of the stack.
2994 * How exactly do we save all the active computations? We create an
2995 * AP_STACK for every UpdateFrame on the stack. Entering one of these
2996 * AP_STACKs pushes everything from the corresponding update frame
2997 * upwards onto the stack. (Actually, it pushes everything up to the
2998 * next update frame plus a pointer to the next AP_STACK object.
2999 * Entering the next AP_STACK object pushes more onto the stack until we
3000 * reach the last AP_STACK object - at which point the stack should look
3001 * exactly as it did when we killed the TSO and we can continue
3002 * execution by entering the closure on top of the stack.
3004 * We can also kill a thread entirely - this happens if either (a) the
3005 * exception passed to raiseAsync is NULL, or (b) there's no
3006 * CATCH_FRAME on the stack. In either case, we strip the entire
3007 * stack and replace the thread with a zombie.
3009 * Locks: sched_mutex held upon entry nor exit.
3011 * -------------------------------------------------------------------------- */
3014 deleteThread(StgTSO *tso)
3016 raiseAsync(tso,NULL);
3019 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3021 deleteThreadImmediately(StgTSO *tso)
3022 { // for forkProcess only:
3023 // delete thread without giving it a chance to catch the KillThread exception
3025 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3029 if (tso->why_blocked != BlockedOnCCall &&
3030 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3034 tso->what_next = ThreadKilled;
3039 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3041 /* When raising async exs from contexts where sched_mutex isn't held;
3042 use raiseAsyncWithLock(). */
3043 ACQUIRE_LOCK(&sched_mutex);
3044 raiseAsync(tso,exception);
3045 RELEASE_LOCK(&sched_mutex);
3049 raiseAsync(StgTSO *tso, StgClosure *exception)
3051 StgRetInfoTable *info;
3054 // Thread already dead?
3055 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3060 sched_belch("raising exception in thread %ld.", (long)tso->id));
3062 // Remove it from any blocking queues
3067 // The stack freezing code assumes there's a closure pointer on
3068 // the top of the stack, so we have to arrange that this is the case...
3070 if (sp[0] == (W_)&stg_enter_info) {
3074 sp[0] = (W_)&stg_dummy_ret_closure;
3080 // 1. Let the top of the stack be the "current closure"
3082 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3085 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3086 // current closure applied to the chunk of stack up to (but not
3087 // including) the update frame. This closure becomes the "current
3088 // closure". Go back to step 2.
3090 // 4. If it's a CATCH_FRAME, then leave the exception handler on
3091 // top of the stack applied to the exception.
3093 // 5. If it's a STOP_FRAME, then kill the thread.
3095 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
3102 info = get_ret_itbl((StgClosure *)frame);
3104 while (info->i.type != UPDATE_FRAME
3105 && (info->i.type != CATCH_FRAME || exception == NULL)
3106 && info->i.type != STOP_FRAME) {
3107 if (info->i.type == ATOMICALLY_FRAME) {
3108 // IF we find an ATOMICALLY_FRAME then we abort the
3109 // current transaction and propagate the exception. In
3110 // this case (unlike ordinary exceptions) we do not care
3111 // whether the transaction is valid or not because its
3112 // possible validity cannot have caused the exception
3113 // and will not be visible after the abort.
3115 debugBelch("Found atomically block delivering async exception\n"));
3116 stmAbortTransaction(tso -> trec);
3117 tso -> trec = stmGetEnclosingTRec(tso -> trec);
3119 frame += stack_frame_sizeW((StgClosure *)frame);
3120 info = get_ret_itbl((StgClosure *)frame);
3123 switch (info->i.type) {
3126 // If we find a CATCH_FRAME, and we've got an exception to raise,
3127 // then build the THUNK raise(exception), and leave it on
3128 // top of the CATCH_FRAME ready to enter.
3132 StgCatchFrame *cf = (StgCatchFrame *)frame;
3136 // we've got an exception to raise, so let's pass it to the
3137 // handler in this frame.
3139 raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3140 TICK_ALLOC_SE_THK(1,0);
3141 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3142 raise->payload[0] = exception;
3144 // throw away the stack from Sp up to the CATCH_FRAME.
3148 /* Ensure that async excpetions are blocked now, so we don't get
3149 * a surprise exception before we get around to executing the
3152 if (tso->blocked_exceptions == NULL) {
3153 tso->blocked_exceptions = END_TSO_QUEUE;
3156 /* Put the newly-built THUNK on top of the stack, ready to execute
3157 * when the thread restarts.
3160 sp[-1] = (W_)&stg_enter_info;
3162 tso->what_next = ThreadRunGHC;
3163 IF_DEBUG(sanity, checkTSO(tso));
3172 // First build an AP_STACK consisting of the stack chunk above the
3173 // current update frame, with the top word on the stack as the
3176 words = frame - sp - 1;
3177 ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3180 ap->fun = (StgClosure *)sp[0];
3182 for(i=0; i < (nat)words; ++i) {
3183 ap->payload[i] = (StgClosure *)*sp++;
3186 SET_HDR(ap,&stg_AP_STACK_info,
3187 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
3188 TICK_ALLOC_UP_THK(words+1,0);
3191 debugBelch("sched: Updating ");
3192 printPtr((P_)((StgUpdateFrame *)frame)->updatee);
3193 debugBelch(" with ");
3194 printObj((StgClosure *)ap);
3197 // Replace the updatee with an indirection - happily
3198 // this will also wake up any threads currently
3199 // waiting on the result.
3201 // Warning: if we're in a loop, more than one update frame on
3202 // the stack may point to the same object. Be careful not to
3203 // overwrite an IND_OLDGEN in this case, because we'll screw
3204 // up the mutable lists. To be on the safe side, don't
3205 // overwrite any kind of indirection at all. See also
3206 // threadSqueezeStack in GC.c, where we have to make a similar
3209 if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3210 // revert the black hole
3211 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3214 sp += sizeofW(StgUpdateFrame) - 1;
3215 sp[0] = (W_)ap; // push onto stack
3220 // We've stripped the entire stack, the thread is now dead.
3221 sp += sizeofW(StgStopFrame);
3222 tso->what_next = ThreadKilled;
3233 /* -----------------------------------------------------------------------------
3234 raiseExceptionHelper
3236 This function is called by the raise# primitve, just so that we can
3237 move some of the tricky bits of raising an exception from C-- into
3238 C. Who knows, it might be a useful re-useable thing here too.
3239 -------------------------------------------------------------------------- */
3242 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3244 StgClosure *raise_closure = NULL;
3246 StgRetInfoTable *info;
3248 // This closure represents the expression 'raise# E' where E
3249 // is the exception raise. It is used to overwrite all the
3250 // thunks which are currently under evaluataion.
3254 // LDV profiling: stg_raise_info has THUNK as its closure
3255 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3256 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
3257 // 1 does not cause any problem unless profiling is performed.
3258 // However, when LDV profiling goes on, we need to linearly scan
3259 // small object pool, where raise_closure is stored, so we should
3260 // use MIN_UPD_SIZE.
3262 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3263 // sizeofW(StgClosure)+1);
3267 // Walk up the stack, looking for the catch frame. On the way,
3268 // we update any closures pointed to from update frames with the
3269 // raise closure that we just built.
3273 info = get_ret_itbl((StgClosure *)p);
3274 next = p + stack_frame_sizeW((StgClosure *)p);
3275 switch (info->i.type) {
3278 // Only create raise_closure if we need to.
3279 if (raise_closure == NULL) {
3281 (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3282 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3283 raise_closure->payload[0] = exception;
3285 UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3289 case ATOMICALLY_FRAME:
3290 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3292 return ATOMICALLY_FRAME;
3298 case CATCH_STM_FRAME:
3299 IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3301 return CATCH_STM_FRAME;
3307 case CATCH_RETRY_FRAME:
3316 /* -----------------------------------------------------------------------------
3317 findRetryFrameHelper
3319 This function is called by the retry# primitive. It traverses the stack
3320 leaving tso->sp referring to the frame which should handle the retry.
3322 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3323 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3325 We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3326 despite the similar implementation.
3328 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3329 not be created within memory transactions.
3330 -------------------------------------------------------------------------- */
3333 findRetryFrameHelper (StgTSO *tso)
3336 StgRetInfoTable *info;
3340 info = get_ret_itbl((StgClosure *)p);
3341 next = p + stack_frame_sizeW((StgClosure *)p);
3342 switch (info->i.type) {
3344 case ATOMICALLY_FRAME:
3345 IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3347 return ATOMICALLY_FRAME;
3349 case CATCH_RETRY_FRAME:
3350 IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3352 return CATCH_RETRY_FRAME;
3354 case CATCH_STM_FRAME:
3356 ASSERT(info->i.type != CATCH_FRAME);
3357 ASSERT(info->i.type != STOP_FRAME);
3364 /* -----------------------------------------------------------------------------
3365 resurrectThreads is called after garbage collection on the list of
3366 threads found to be garbage. Each of these threads will be woken
3367 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3368 on an MVar, or NonTermination if the thread was blocked on a Black
3371 Locks: sched_mutex isn't held upon entry nor exit.
3372 -------------------------------------------------------------------------- */
3375 resurrectThreads( StgTSO *threads )
3379 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3380 next = tso->global_link;
3381 tso->global_link = all_threads;
3383 IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3385 switch (tso->why_blocked) {
3387 case BlockedOnException:
3388 /* Called by GC - sched_mutex lock is currently held. */
3389 raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3391 case BlockedOnBlackHole:
3392 raiseAsync(tso,(StgClosure *)NonTermination_closure);
3395 raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3398 /* This might happen if the thread was blocked on a black hole
3399 * belonging to a thread that we've just woken up (raiseAsync
3400 * can wake up threads, remember...).
3404 barf("resurrectThreads: thread blocked in a strange way");
3409 /* ----------------------------------------------------------------------------
3410 * Debugging: why is a thread blocked
3411 * [Also provides useful information when debugging threaded programs
3412 * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3413 ------------------------------------------------------------------------- */
3417 printThreadBlockage(StgTSO *tso)
3419 switch (tso->why_blocked) {
3421 debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3423 case BlockedOnWrite:
3424 debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3426 #if defined(mingw32_TARGET_OS)
3427 case BlockedOnDoProc:
3428 debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3431 case BlockedOnDelay:
3432 debugBelch("is blocked until %d", tso->block_info.target);
3435 debugBelch("is blocked on an MVar");
3437 case BlockedOnException:
3438 debugBelch("is blocked on delivering an exception to thread %d",
3439 tso->block_info.tso->id);
3441 case BlockedOnBlackHole:
3442 debugBelch("is blocked on a black hole");
3445 debugBelch("is not blocked");
3449 debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3450 tso->block_info.closure, info_type(tso->block_info.closure));
3452 case BlockedOnGA_NoSend:
3453 debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3454 tso->block_info.closure, info_type(tso->block_info.closure));
3457 case BlockedOnCCall:
3458 debugBelch("is blocked on an external call");
3460 case BlockedOnCCall_NoUnblockExc:
3461 debugBelch("is blocked on an external call (exceptions were already blocked)");
3464 debugBelch("is blocked on an STM operation");
3467 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3468 tso->why_blocked, tso->id, tso);
3474 printThreadStatus(StgTSO *tso)
3476 switch (tso->what_next) {
3478 debugBelch("has been killed");
3480 case ThreadComplete:
3481 debugBelch("has completed");
3484 printThreadBlockage(tso);
3489 printAllThreads(void)
3494 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3495 ullong_format_string(TIME_ON_PROC(CurrentProc),
3496 time_string, rtsFalse/*no commas!*/);
3498 debugBelch("all threads at [%s]:\n", time_string);
3500 char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3501 ullong_format_string(CURRENT_TIME,
3502 time_string, rtsFalse/*no commas!*/);
3504 debugBelch("all threads at [%s]:\n", time_string);
3506 debugBelch("all threads:\n");
3509 for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3510 debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3513 void *label = lookupThreadLabel(t->id);
3514 if (label) debugBelch("[\"%s\"] ",(char *)label);
3517 printThreadStatus(t);
3525 Print a whole blocking queue attached to node (debugging only).
3529 print_bq (StgClosure *node)
3531 StgBlockingQueueElement *bqe;
3535 debugBelch("## BQ of closure %p (%s): ",
3536 node, info_type(node));
3538 /* should cover all closures that may have a blocking queue */
3539 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3540 get_itbl(node)->type == FETCH_ME_BQ ||
3541 get_itbl(node)->type == RBH ||
3542 get_itbl(node)->type == MVAR);
3544 ASSERT(node!=(StgClosure*)NULL); // sanity check
3546 print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3550 Print a whole blocking queue starting with the element bqe.
3553 print_bqe (StgBlockingQueueElement *bqe)
3558 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3560 for (end = (bqe==END_BQ_QUEUE);
3561 !end; // iterate until bqe points to a CONSTR
3562 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE),
3563 bqe = end ? END_BQ_QUEUE : bqe->link) {
3564 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3565 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3566 /* types of closures that may appear in a blocking queue */
3567 ASSERT(get_itbl(bqe)->type == TSO ||
3568 get_itbl(bqe)->type == BLOCKED_FETCH ||
3569 get_itbl(bqe)->type == CONSTR);
3570 /* only BQs of an RBH end with an RBH_Save closure */
3571 //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3573 switch (get_itbl(bqe)->type) {
3575 debugBelch(" TSO %u (%x),",
3576 ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3579 debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3580 ((StgBlockedFetch *)bqe)->node,
3581 ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3582 ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3583 ((StgBlockedFetch *)bqe)->ga.weight);
3586 debugBelch(" %s (IP %p),",
3587 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3588 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3589 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3590 "RBH_Save_?"), get_itbl(bqe));
3593 barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3594 info_type((StgClosure *)bqe)); // , node, info_type(node));
3600 # elif defined(GRAN)
3602 print_bq (StgClosure *node)
3604 StgBlockingQueueElement *bqe;
3605 PEs node_loc, tso_loc;
3608 /* should cover all closures that may have a blocking queue */
3609 ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3610 get_itbl(node)->type == FETCH_ME_BQ ||
3611 get_itbl(node)->type == RBH);
3613 ASSERT(node!=(StgClosure*)NULL); // sanity check
3614 node_loc = where_is(node);
3616 debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3617 node, info_type(node), node_loc);
3620 NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3622 for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3623 !end; // iterate until bqe points to a CONSTR
3624 end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3625 ASSERT(bqe != END_BQ_QUEUE); // sanity check
3626 ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check
3627 /* types of closures that may appear in a blocking queue */
3628 ASSERT(get_itbl(bqe)->type == TSO ||
3629 get_itbl(bqe)->type == CONSTR);
3630 /* only BQs of an RBH end with an RBH_Save closure */
3631 ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3633 tso_loc = where_is((StgClosure *)bqe);
3634 switch (get_itbl(bqe)->type) {
3636 debugBelch(" TSO %d (%p) on [PE %d],",
3637 ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3640 debugBelch(" %s (IP %p),",
3641 (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3642 get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3643 get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3644 "RBH_Save_?"), get_itbl(bqe));
3647 barf("Unexpected closure type %s in blocking queue of %p (%s)",
3648 info_type((StgClosure *)bqe), node, info_type(node));
3656 Nice and easy: only TSOs on the blocking queue
3659 print_bq (StgClosure *node)
3663 ASSERT(node!=(StgClosure*)NULL); // sanity check
3664 for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3665 tso != END_TSO_QUEUE;
3667 ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
3668 ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
3669 debugBelch(" TSO %d (%p),", tso->id, tso);
3682 for (i=0, tso=run_queue_hd;
3683 tso != END_TSO_QUEUE;
3692 sched_belch(char *s, ...)
3696 #ifdef RTS_SUPPORTS_THREADS
3697 debugBelch("sched (task %p): ", osThreadId());
3701 debugBelch("sched: ");