5cef10c2b9836c1999ee14df8c4da4ba38b9fe95
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.133 2002/03/12 11:51:06 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #include <stdarg.h>
118
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
121
122 /* Main thread queue.
123  * Locks required: sched_mutex.
124  */
125 StgMainThread *main_threads;
126
127 /* Thread queues.
128  * Locks required: sched_mutex.
129  */
130 #if defined(GRAN)
131
132 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
133 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
134
135 /* 
136    In GranSim we have a runnable and a blocked queue for each processor.
137    In order to minimise code changes new arrays run_queue_hds/tls
138    are created. run_queue_hd is then a short cut (macro) for
139    run_queue_hds[CurrentProc] (see GranSim.h).
140    -- HWL
141 */
142 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
143 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
144 StgTSO *ccalling_threadss[MAX_PROC];
145 /* We use the same global list of threads (all_threads) in GranSim as in
146    the std RTS (i.e. we are cheating). However, we don't use this list in
147    the GranSim specific code at the moment (so we are only potentially
148    cheating).  */
149
150 #else /* !GRAN */
151
152 StgTSO *run_queue_hd, *run_queue_tl;
153 StgTSO *blocked_queue_hd, *blocked_queue_tl;
154 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
155
156 #endif
157
158 /* Linked list of all threads.
159  * Used for detecting garbage collected threads.
160  */
161 StgTSO *all_threads;
162
163 /* When a thread performs a safe C call (_ccall_GC, using old
164  * terminology), it gets put on the suspended_ccalling_threads
165  * list. Used by the garbage collector.
166  */
167 static StgTSO *suspended_ccalling_threads;
168
169 static StgTSO *threadStackOverflow(StgTSO *tso);
170
171 /* KH: The following two flags are shared memory locations.  There is no need
172        to lock them, since they are only unset at the end of a scheduler
173        operation.
174 */
175
176 /* flag set by signal handler to precipitate a context switch */
177 //@cindex context_switch
178 nat context_switch;
179
180 /* if this flag is set as well, give up execution */
181 //@cindex interrupted
182 rtsBool interrupted;
183
184 /* Next thread ID to allocate.
185  * Locks required: sched_mutex
186  */
187 //@cindex next_thread_id
188 StgThreadID next_thread_id = 1;
189
190 /*
191  * Pointers to the state of the current thread.
192  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
193  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
194  */
195  
196 /* The smallest stack size that makes any sense is:
197  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
198  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
199  *  + 1                       (the realworld token for an IO thread)
200  *  + 1                       (the closure to enter)
201  *
202  * A thread with this stack will bomb immediately with a stack
203  * overflow, which will increase its stack size.  
204  */
205
206 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
207
208
209 #if defined(GRAN)
210 StgTSO *CurrentTSO;
211 #endif
212
213 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
214  *  exists - earlier gccs apparently didn't.
215  *  -= chak
216  */
217 StgTSO dummy_tso;
218
219 rtsBool ready_to_gc;
220
221 void            addToBlockedQueue ( StgTSO *tso );
222
223 static void     schedule          ( void );
224        void     interruptStgRts   ( void );
225 #if defined(GRAN)
226 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
227 #else
228 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
229 #endif
230
231 static void     detectBlackHoles  ( void );
232
233 #ifdef DEBUG
234 static void sched_belch(char *s, ...);
235 #endif
236
237 #if defined(RTS_SUPPORTS_THREADS)
238 /* ToDo: carefully document the invariants that go together
239  *       with these synchronisation objects.
240  */
241 Mutex     sched_mutex       = INIT_MUTEX_VAR;
242 Mutex     term_mutex        = INIT_MUTEX_VAR;
243
244 # if defined(SMP)
245 static Condition gc_pending_cond = INIT_COND_VAR;
246 nat await_death;
247 # endif
248
249 #endif /* RTS_SUPPORTS_THREADS */
250
251 #if defined(PAR)
252 StgTSO *LastTSO;
253 rtsTime TimeOfLastYield;
254 rtsBool emitSchedule = rtsTrue;
255 #endif
256
257 #if DEBUG
258 char *whatNext_strs[] = {
259   "ThreadEnterGHC",
260   "ThreadRunGHC",
261   "ThreadEnterInterp",
262   "ThreadKilled",
263   "ThreadComplete"
264 };
265
266 char *threadReturnCode_strs[] = {
267   "HeapOverflow",                       /* might also be StackOverflow */
268   "StackOverflow",
269   "ThreadYielding",
270   "ThreadBlocked",
271   "ThreadFinished"
272 };
273 #endif
274
275 #if defined(PAR)
276 StgTSO * createSparkThread(rtsSpark spark);
277 StgTSO * activateSpark (rtsSpark spark);  
278 #endif
279
280 /*
281  * The thread state for the main thread.
282 // ToDo: check whether not needed any more
283 StgTSO   *MainTSO;
284  */
285
286 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
287 static void taskStart(void);
288 static void
289 taskStart(void)
290 {
291   schedule();
292 }
293 #endif
294
295
296
297
298 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
299 //@subsection Main scheduling loop
300
301 /* ---------------------------------------------------------------------------
302    Main scheduling loop.
303
304    We use round-robin scheduling, each thread returning to the
305    scheduler loop when one of these conditions is detected:
306
307       * out of heap space
308       * timer expires (thread yields)
309       * thread blocks
310       * thread ends
311       * stack overflow
312
313    Locking notes:  we acquire the scheduler lock once at the beginning
314    of the scheduler loop, and release it when
315     
316       * running a thread, or
317       * waiting for work, or
318       * waiting for a GC to complete.
319
320    GRAN version:
321      In a GranSim setup this loop iterates over the global event queue.
322      This revolves around the global event queue, which determines what 
323      to do next. Therefore, it's more complicated than either the 
324      concurrent or the parallel (GUM) setup.
325
326    GUM version:
327      GUM iterates over incoming messages.
328      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
329      and sends out a fish whenever it has nothing to do; in-between
330      doing the actual reductions (shared code below) it processes the
331      incoming messages and deals with delayed operations 
332      (see PendingFetches).
333      This is not the ugliest code you could imagine, but it's bloody close.
334
335    ------------------------------------------------------------------------ */
336 //@cindex schedule
337 static void
338 schedule( void )
339 {
340   StgTSO *t;
341   Capability *cap;
342   StgThreadReturnCode ret;
343 #if defined(GRAN)
344   rtsEvent *event;
345 #elif defined(PAR)
346   StgSparkPool *pool;
347   rtsSpark spark;
348   StgTSO *tso;
349   GlobalTaskId pe;
350   rtsBool receivedFinish = rtsFalse;
351 # if defined(DEBUG)
352   nat tp_size, sp_size; // stats only
353 # endif
354 #endif
355   rtsBool was_interrupted = rtsFalse;
356   
357   ACQUIRE_LOCK(&sched_mutex);
358  
359 #if defined(RTS_SUPPORTS_THREADS)
360   /* Check to see whether there are any worker threads
361      waiting to deposit external call results. If so,
362      yield our capability */
363   yieldToReturningWorker(&sched_mutex, cap);
364
365   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
366 #endif
367
368 #if defined(GRAN)
369   /* set up first event to get things going */
370   /* ToDo: assign costs for system setup and init MainTSO ! */
371   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
372             ContinueThread, 
373             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
374
375   IF_DEBUG(gran,
376            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
377            G_TSO(CurrentTSO, 5));
378
379   if (RtsFlags.GranFlags.Light) {
380     /* Save current time; GranSim Light only */
381     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
382   }      
383
384   event = get_next_event();
385
386   while (event!=(rtsEvent*)NULL) {
387     /* Choose the processor with the next event */
388     CurrentProc = event->proc;
389     CurrentTSO = event->tso;
390
391 #elif defined(PAR)
392
393   while (!receivedFinish) {    /* set by processMessages */
394                                /* when receiving PP_FINISH message         */ 
395 #else
396
397   while (1) {
398
399 #endif
400
401     IF_DEBUG(scheduler, printAllThreads());
402
403     /* If we're interrupted (the user pressed ^C, or some other
404      * termination condition occurred), kill all the currently running
405      * threads.
406      */
407     if (interrupted) {
408       IF_DEBUG(scheduler, sched_belch("interrupted"));
409       deleteAllThreads();
410       interrupted = rtsFalse;
411       was_interrupted = rtsTrue;
412     }
413
414     /* Go through the list of main threads and wake up any
415      * clients whose computations have finished.  ToDo: this
416      * should be done more efficiently without a linear scan
417      * of the main threads list, somehow...
418      */
419 #if defined(RTS_SUPPORTS_THREADS)
420     { 
421       StgMainThread *m, **prev;
422       prev = &main_threads;
423       for (m = main_threads; m != NULL; m = m->link) {
424         switch (m->tso->what_next) {
425         case ThreadComplete:
426           if (m->ret) {
427             *(m->ret) = (StgClosure *)m->tso->sp[0];
428           }
429           *prev = m->link;
430           m->stat = Success;
431           broadcastCondition(&m->wakeup);
432           break;
433         case ThreadKilled:
434           if (m->ret) *(m->ret) = NULL;
435           *prev = m->link;
436           if (was_interrupted) {
437             m->stat = Interrupted;
438           } else {
439             m->stat = Killed;
440           }
441           broadcastCondition(&m->wakeup);
442           break;
443         default:
444           break;
445         }
446       }
447     }
448
449 #else /* not threaded */
450
451 # if defined(PAR)
452     /* in GUM do this only on the Main PE */
453     if (IAmMainThread)
454 # endif
455     /* If our main thread has finished or been killed, return.
456      */
457     {
458       StgMainThread *m = main_threads;
459       if (m->tso->what_next == ThreadComplete
460           || m->tso->what_next == ThreadKilled) {
461         main_threads = main_threads->link;
462         if (m->tso->what_next == ThreadComplete) {
463           /* we finished successfully, fill in the return value */
464           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
465           m->stat = Success;
466           return;
467         } else {
468           if (m->ret) { *(m->ret) = NULL; };
469           if (was_interrupted) {
470             m->stat = Interrupted;
471           } else {
472             m->stat = Killed;
473           }
474           return;
475         }
476       }
477     }
478 #endif
479
480     /* Top up the run queue from our spark pool.  We try to make the
481      * number of threads in the run queue equal to the number of
482      * free capabilities.
483      *
484      * Disable spark support in SMP for now, non-essential & requires
485      * a little bit of work to make it compile cleanly. -- sof 1/02.
486      */
487 #if 0 /* defined(SMP) */
488     {
489       nat n = getFreeCapabilities();
490       StgTSO *tso = run_queue_hd;
491
492       /* Count the run queue */
493       while (n > 0 && tso != END_TSO_QUEUE) {
494         tso = tso->link;
495         n--;
496       }
497
498       for (; n > 0; n--) {
499         StgClosure *spark;
500         spark = findSpark(rtsFalse);
501         if (spark == NULL) {
502           break; /* no more sparks in the pool */
503         } else {
504           /* I'd prefer this to be done in activateSpark -- HWL */
505           /* tricky - it needs to hold the scheduler lock and
506            * not try to re-acquire it -- SDM */
507           createSparkThread(spark);       
508           IF_DEBUG(scheduler,
509                    sched_belch("==^^ turning spark of closure %p into a thread",
510                                (StgClosure *)spark));
511         }
512       }
513       /* We need to wake up the other tasks if we just created some
514        * work for them.
515        */
516       if (getFreeCapabilities() - n > 1) {
517           signalCondition( &thread_ready_cond );
518       }
519     }
520 #endif // SMP
521
522     /* check for signals each time around the scheduler */
523 #ifndef mingw32_TARGET_OS
524     if (signals_pending()) {
525       startSignalHandlers();
526     }
527 #endif
528
529     /* Check whether any waiting threads need to be woken up.  If the
530      * run queue is empty, and there are no other tasks running, we
531      * can wait indefinitely for something to happen.
532      * ToDo: what if another client comes along & requests another
533      * main thread?
534      */
535     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
536       awaitEvent( EMPTY_RUN_QUEUE()
537 #if defined(SMP)
538         && allFreeCapabilities()
539 #endif
540         );
541     }
542     /* we can be interrupted while waiting for I/O... */
543     if (interrupted) continue;
544
545     /* 
546      * Detect deadlock: when we have no threads to run, there are no
547      * threads waiting on I/O or sleeping, and all the other tasks are
548      * waiting for work, we must have a deadlock of some description.
549      *
550      * We first try to find threads blocked on themselves (ie. black
551      * holes), and generate NonTermination exceptions where necessary.
552      *
553      * If no threads are black holed, we have a deadlock situation, so
554      * inform all the main threads.
555      */
556 #ifndef PAR
557     if (   EMPTY_RUN_QUEUE()
558         && EMPTY_QUEUE(blocked_queue_hd)
559         && EMPTY_QUEUE(sleeping_queue)
560 #if defined(RTS_SUPPORTS_THREADS)
561         && EMPTY_QUEUE(suspended_ccalling_threads)
562 #endif
563 #ifdef SMP
564         && allFreeCapabilities()
565 #endif
566         )
567     {
568         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
569 #if defined(THREADED_RTS)
570         /* and SMP mode ..? */
571         releaseCapability(cap);
572 #endif
573         GarbageCollect(GetRoots,rtsTrue);
574         if (   EMPTY_QUEUE(blocked_queue_hd)
575             && EMPTY_RUN_QUEUE()
576             && EMPTY_QUEUE(sleeping_queue) ) {
577
578             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
579             detectBlackHoles();
580
581             /* No black holes, so probably a real deadlock.  Send the
582              * current main thread the Deadlock exception (or in the SMP
583              * build, send *all* main threads the deadlock exception,
584              * since none of them can make progress).
585              */
586             if ( EMPTY_RUN_QUEUE() ) {
587                 StgMainThread *m;
588 #if defined(RTS_SUPPORTS_THREADS)
589                 for (m = main_threads; m != NULL; m = m->link) {
590                     switch (m->tso->why_blocked) {
591                     case BlockedOnBlackHole:
592                         raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
593                         break;
594                     case BlockedOnException:
595                     case BlockedOnMVar:
596                         raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
597                         break;
598                     default:
599                         barf("deadlock: main thread blocked in a strange way");
600                     }
601                 }
602 #else
603                 m = main_threads;
604                 switch (m->tso->why_blocked) {
605                 case BlockedOnBlackHole:
606                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
607                     break;
608                 case BlockedOnException:
609                 case BlockedOnMVar:
610                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
611                     break;
612                 default:
613                     barf("deadlock: main thread blocked in a strange way");
614                 }
615 #endif
616             }
617 #if defined(RTS_SUPPORTS_THREADS)
618             /* ToDo: revisit conditions (and mechanism) for shutting
619                down a multi-threaded world  */
620             if ( EMPTY_RUN_QUEUE() ) {
621               IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
622               shutdownHaskellAndExit(0);
623             }
624 #endif
625             ASSERT( !EMPTY_RUN_QUEUE() );
626         }
627     }
628 #elif defined(PAR)
629     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
630 #endif
631
632 #if defined(SMP)
633     /* If there's a GC pending, don't do anything until it has
634      * completed.
635      */
636     if (ready_to_gc) {
637       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
638       waitCondition( &gc_pending_cond, &sched_mutex );
639     }
640 #endif    
641
642 #if defined(RTS_SUPPORTS_THREADS)
643     /* block until we've got a thread on the run queue and a free
644      * capability.
645      *
646      */
647     if ( EMPTY_RUN_QUEUE() ) {
648       /* Give up our capability */
649       releaseCapability(cap);
650       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
651       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
652       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
653 #if 0
654       while ( EMPTY_RUN_QUEUE() ) {
655         waitForWorkCapability(&sched_mutex, &cap);
656         IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
657       }
658 #endif
659     }
660 #endif
661
662 #if defined(GRAN)
663     if (RtsFlags.GranFlags.Light)
664       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
665
666     /* adjust time based on time-stamp */
667     if (event->time > CurrentTime[CurrentProc] &&
668         event->evttype != ContinueThread)
669       CurrentTime[CurrentProc] = event->time;
670     
671     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
672     if (!RtsFlags.GranFlags.Light)
673       handleIdlePEs();
674
675     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
676
677     /* main event dispatcher in GranSim */
678     switch (event->evttype) {
679       /* Should just be continuing execution */
680     case ContinueThread:
681       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
682       /* ToDo: check assertion
683       ASSERT(run_queue_hd != (StgTSO*)NULL &&
684              run_queue_hd != END_TSO_QUEUE);
685       */
686       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
687       if (!RtsFlags.GranFlags.DoAsyncFetch &&
688           procStatus[CurrentProc]==Fetching) {
689         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
690               CurrentTSO->id, CurrentTSO, CurrentProc);
691         goto next_thread;
692       } 
693       /* Ignore ContinueThreads for completed threads */
694       if (CurrentTSO->what_next == ThreadComplete) {
695         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
696               CurrentTSO->id, CurrentTSO, CurrentProc);
697         goto next_thread;
698       } 
699       /* Ignore ContinueThreads for threads that are being migrated */
700       if (PROCS(CurrentTSO)==Nowhere) { 
701         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
702               CurrentTSO->id, CurrentTSO, CurrentProc);
703         goto next_thread;
704       }
705       /* The thread should be at the beginning of the run queue */
706       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
707         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
708               CurrentTSO->id, CurrentTSO, CurrentProc);
709         break; // run the thread anyway
710       }
711       /*
712       new_event(proc, proc, CurrentTime[proc],
713                 FindWork,
714                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
715       goto next_thread; 
716       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
717       break; // now actually run the thread; DaH Qu'vam yImuHbej 
718
719     case FetchNode:
720       do_the_fetchnode(event);
721       goto next_thread;             /* handle next event in event queue  */
722       
723     case GlobalBlock:
724       do_the_globalblock(event);
725       goto next_thread;             /* handle next event in event queue  */
726       
727     case FetchReply:
728       do_the_fetchreply(event);
729       goto next_thread;             /* handle next event in event queue  */
730       
731     case UnblockThread:   /* Move from the blocked queue to the tail of */
732       do_the_unblock(event);
733       goto next_thread;             /* handle next event in event queue  */
734       
735     case ResumeThread:  /* Move from the blocked queue to the tail of */
736       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
737       event->tso->gran.blocktime += 
738         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
739       do_the_startthread(event);
740       goto next_thread;             /* handle next event in event queue  */
741       
742     case StartThread:
743       do_the_startthread(event);
744       goto next_thread;             /* handle next event in event queue  */
745       
746     case MoveThread:
747       do_the_movethread(event);
748       goto next_thread;             /* handle next event in event queue  */
749       
750     case MoveSpark:
751       do_the_movespark(event);
752       goto next_thread;             /* handle next event in event queue  */
753       
754     case FindWork:
755       do_the_findwork(event);
756       goto next_thread;             /* handle next event in event queue  */
757       
758     default:
759       barf("Illegal event type %u\n", event->evttype);
760     }  /* switch */
761     
762     /* This point was scheduler_loop in the old RTS */
763
764     IF_DEBUG(gran, belch("GRAN: after main switch"));
765
766     TimeOfLastEvent = CurrentTime[CurrentProc];
767     TimeOfNextEvent = get_time_of_next_event();
768     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
769     // CurrentTSO = ThreadQueueHd;
770
771     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
772                          TimeOfNextEvent));
773
774     if (RtsFlags.GranFlags.Light) 
775       GranSimLight_leave_system(event, &ActiveTSO); 
776
777     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
778
779     IF_DEBUG(gran, 
780              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
781
782     /* in a GranSim setup the TSO stays on the run queue */
783     t = CurrentTSO;
784     /* Take a thread from the run queue. */
785     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
786
787     IF_DEBUG(gran, 
788              fprintf(stderr, "GRAN: About to run current thread, which is\n");
789              G_TSO(t,5));
790
791     context_switch = 0; // turned on via GranYield, checking events and time slice
792
793     IF_DEBUG(gran, 
794              DumpGranEvent(GR_SCHEDULE, t));
795
796     procStatus[CurrentProc] = Busy;
797
798 #elif defined(PAR)
799     if (PendingFetches != END_BF_QUEUE) {
800         processFetches();
801     }
802
803     /* ToDo: phps merge with spark activation above */
804     /* check whether we have local work and send requests if we have none */
805     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
806       /* :-[  no local threads => look out for local sparks */
807       /* the spark pool for the current PE */
808       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
809       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
810           pool->hd < pool->tl) {
811         /* 
812          * ToDo: add GC code check that we really have enough heap afterwards!!
813          * Old comment:
814          * If we're here (no runnable threads) and we have pending
815          * sparks, we must have a space problem.  Get enough space
816          * to turn one of those pending sparks into a
817          * thread... 
818          */
819
820         spark = findSpark(rtsFalse);                /* get a spark */
821         if (spark != (rtsSpark) NULL) {
822           tso = activateSpark(spark);       /* turn the spark into a thread */
823           IF_PAR_DEBUG(schedule,
824                        belch("==== schedule: Created TSO %d (%p); %d threads active",
825                              tso->id, tso, advisory_thread_count));
826
827           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
828             belch("==^^ failed to activate spark");
829             goto next_thread;
830           }               /* otherwise fall through & pick-up new tso */
831         } else {
832           IF_PAR_DEBUG(verbose,
833                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
834                              spark_queue_len(pool)));
835           goto next_thread;
836         }
837       }
838
839       /* If we still have no work we need to send a FISH to get a spark
840          from another PE 
841       */
842       if (EMPTY_RUN_QUEUE()) {
843       /* =8-[  no local sparks => look for work on other PEs */
844         /*
845          * We really have absolutely no work.  Send out a fish
846          * (there may be some out there already), and wait for
847          * something to arrive.  We clearly can't run any threads
848          * until a SCHEDULE or RESUME arrives, and so that's what
849          * we're hoping to see.  (Of course, we still have to
850          * respond to other types of messages.)
851          */
852         TIME now = msTime() /*CURRENT_TIME*/;
853         IF_PAR_DEBUG(verbose, 
854                      belch("--  now=%ld", now));
855         IF_PAR_DEBUG(verbose,
856                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
857                          (last_fish_arrived_at!=0 &&
858                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
859                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
860                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
861                              last_fish_arrived_at,
862                              RtsFlags.ParFlags.fishDelay, now);
863                      });
864         
865         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
866             (last_fish_arrived_at==0 ||
867              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
868           /* outstandingFishes is set in sendFish, processFish;
869              avoid flooding system with fishes via delay */
870           pe = choosePE();
871           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
872                    NEW_FISH_HUNGER);
873
874           // Global statistics: count no. of fishes
875           if (RtsFlags.ParFlags.ParStats.Global &&
876               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
877             globalParStats.tot_fish_mess++;
878           }
879         }
880       
881         receivedFinish = processMessages();
882         goto next_thread;
883       }
884     } else if (PacketsWaiting()) {  /* Look for incoming messages */
885       receivedFinish = processMessages();
886     }
887
888     /* Now we are sure that we have some work available */
889     ASSERT(run_queue_hd != END_TSO_QUEUE);
890
891     /* Take a thread from the run queue, if we have work */
892     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
893     IF_DEBUG(sanity,checkTSO(t));
894
895     /* ToDo: write something to the log-file
896     if (RTSflags.ParFlags.granSimStats && !sameThread)
897         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
898
899     CurrentTSO = t;
900     */
901     /* the spark pool for the current PE */
902     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
903
904     IF_DEBUG(scheduler, 
905              belch("--=^ %d threads, %d sparks on [%#x]", 
906                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
907
908 # if 1
909     if (0 && RtsFlags.ParFlags.ParStats.Full && 
910         t && LastTSO && t->id != LastTSO->id && 
911         LastTSO->why_blocked == NotBlocked && 
912         LastTSO->what_next != ThreadComplete) {
913       // if previously scheduled TSO not blocked we have to record the context switch
914       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
915                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
916     }
917
918     if (RtsFlags.ParFlags.ParStats.Full && 
919         (emitSchedule /* forced emit */ ||
920         (t && LastTSO && t->id != LastTSO->id))) {
921       /* 
922          we are running a different TSO, so write a schedule event to log file
923          NB: If we use fair scheduling we also have to write  a deschedule 
924              event for LastTSO; with unfair scheduling we know that the
925              previous tso has blocked whenever we switch to another tso, so
926              we don't need it in GUM for now
927       */
928       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
929                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
930       emitSchedule = rtsFalse;
931     }
932      
933 # endif
934 #else /* !GRAN && !PAR */
935   
936     /* grab a thread from the run queue */
937     ASSERT(run_queue_hd != END_TSO_QUEUE);
938     t = POP_RUN_QUEUE();
939     // Sanity check the thread we're about to run.  This can be
940     // expensive if there is lots of thread switching going on...
941     IF_DEBUG(sanity,checkTSO(t));
942 #endif
943     
944     grabCapability(&cap);
945     cap->r.rCurrentTSO = t;
946     
947     /* context switches are now initiated by the timer signal, unless
948      * the user specified "context switch as often as possible", with
949      * +RTS -C0
950      */
951     if (
952 #ifdef PROFILING
953         RtsFlags.ProfFlags.profileInterval == 0 ||
954 #endif
955         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
956          && (run_queue_hd != END_TSO_QUEUE
957              || blocked_queue_hd != END_TSO_QUEUE
958              || sleeping_queue != END_TSO_QUEUE)))
959         context_switch = 1;
960     else
961         context_switch = 0;
962
963     RELEASE_LOCK(&sched_mutex);
964
965     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
966                               t->id, t, whatNext_strs[t->what_next]));
967
968 #ifdef PROFILING
969     startHeapProfTimer();
970 #endif
971
972     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
973     /* Run the current thread 
974      */
975     switch (cap->r.rCurrentTSO->what_next) {
976     case ThreadKilled:
977     case ThreadComplete:
978         /* Thread already finished, return to scheduler. */
979         ret = ThreadFinished;
980         break;
981     case ThreadEnterGHC:
982         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
983         break;
984     case ThreadRunGHC:
985         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
986         break;
987     case ThreadEnterInterp:
988         ret = interpretBCO(cap);
989         break;
990     default:
991       barf("schedule: invalid what_next field");
992     }
993     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
994     
995     /* Costs for the scheduler are assigned to CCS_SYSTEM */
996 #ifdef PROFILING
997     stopHeapProfTimer();
998     CCCS = CCS_SYSTEM;
999 #endif
1000     
1001     ACQUIRE_LOCK(&sched_mutex);
1002
1003 #ifdef SMP
1004     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1005 #elif !defined(GRAN) && !defined(PAR)
1006     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1007 #endif
1008     t = cap->r.rCurrentTSO;
1009     
1010 #if defined(PAR)
1011     /* HACK 675: if the last thread didn't yield, make sure to print a 
1012        SCHEDULE event to the log file when StgRunning the next thread, even
1013        if it is the same one as before */
1014     LastTSO = t; 
1015     TimeOfLastYield = CURRENT_TIME;
1016 #endif
1017
1018     switch (ret) {
1019     case HeapOverflow:
1020 #if defined(GRAN)
1021       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1022       globalGranStats.tot_heapover++;
1023 #elif defined(PAR)
1024       globalParStats.tot_heapover++;
1025 #endif
1026
1027       // did the task ask for a large block?
1028       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1029           // if so, get one and push it on the front of the nursery.
1030           bdescr *bd;
1031           nat blocks;
1032           
1033           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1034
1035           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1036                                    t->id, t,
1037                                    whatNext_strs[t->what_next], blocks));
1038
1039           // don't do this if it would push us over the
1040           // alloc_blocks_lim limit; we'll GC first.
1041           if (alloc_blocks + blocks < alloc_blocks_lim) {
1042
1043               alloc_blocks += blocks;
1044               bd = allocGroup( blocks );
1045
1046               // link the new group into the list
1047               bd->link = cap->r.rCurrentNursery;
1048               bd->u.back = cap->r.rCurrentNursery->u.back;
1049               if (cap->r.rCurrentNursery->u.back != NULL) {
1050                   cap->r.rCurrentNursery->u.back->link = bd;
1051               } else {
1052                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1053                          g0s0->blocks == cap->r.rNursery);
1054                   cap->r.rNursery = g0s0->blocks = bd;
1055               }           
1056               cap->r.rCurrentNursery->u.back = bd;
1057
1058               // initialise it as a nursery block
1059               bd->step = g0s0;
1060               bd->gen_no = 0;
1061               bd->flags = 0;
1062               bd->free = bd->start;
1063
1064               // don't forget to update the block count in g0s0.
1065               g0s0->n_blocks += blocks;
1066               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1067
1068               // now update the nursery to point to the new block
1069               cap->r.rCurrentNursery = bd;
1070
1071               // we might be unlucky and have another thread get on the
1072               // run queue before us and steal the large block, but in that
1073               // case the thread will just end up requesting another large
1074               // block.
1075               PUSH_ON_RUN_QUEUE(t);
1076               break;
1077           }
1078       }
1079
1080       /* make all the running tasks block on a condition variable,
1081        * maybe set context_switch and wait till they all pile in,
1082        * then have them wait on a GC condition variable.
1083        */
1084       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1085                                t->id, t, whatNext_strs[t->what_next]));
1086       threadPaused(t);
1087 #if defined(GRAN)
1088       ASSERT(!is_on_queue(t,CurrentProc));
1089 #elif defined(PAR)
1090       /* Currently we emit a DESCHEDULE event before GC in GUM.
1091          ToDo: either add separate event to distinguish SYSTEM time from rest
1092                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1093       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1094         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1095                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1096         emitSchedule = rtsTrue;
1097       }
1098 #endif
1099       
1100       ready_to_gc = rtsTrue;
1101       context_switch = 1;               /* stop other threads ASAP */
1102       PUSH_ON_RUN_QUEUE(t);
1103       /* actual GC is done at the end of the while loop */
1104       break;
1105       
1106     case StackOverflow:
1107 #if defined(GRAN)
1108       IF_DEBUG(gran, 
1109                DumpGranEvent(GR_DESCHEDULE, t));
1110       globalGranStats.tot_stackover++;
1111 #elif defined(PAR)
1112       // IF_DEBUG(par, 
1113       // DumpGranEvent(GR_DESCHEDULE, t);
1114       globalParStats.tot_stackover++;
1115 #endif
1116       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1117                                t->id, t, whatNext_strs[t->what_next]));
1118       /* just adjust the stack for this thread, then pop it back
1119        * on the run queue.
1120        */
1121       threadPaused(t);
1122       { 
1123         StgMainThread *m;
1124         /* enlarge the stack */
1125         StgTSO *new_t = threadStackOverflow(t);
1126         
1127         /* This TSO has moved, so update any pointers to it from the
1128          * main thread stack.  It better not be on any other queues...
1129          * (it shouldn't be).
1130          */
1131         for (m = main_threads; m != NULL; m = m->link) {
1132           if (m->tso == t) {
1133             m->tso = new_t;
1134           }
1135         }
1136         threadPaused(new_t);
1137         PUSH_ON_RUN_QUEUE(new_t);
1138       }
1139       break;
1140
1141     case ThreadYielding:
1142 #if defined(GRAN)
1143       IF_DEBUG(gran, 
1144                DumpGranEvent(GR_DESCHEDULE, t));
1145       globalGranStats.tot_yields++;
1146 #elif defined(PAR)
1147       // IF_DEBUG(par, 
1148       // DumpGranEvent(GR_DESCHEDULE, t);
1149       globalParStats.tot_yields++;
1150 #endif
1151       /* put the thread back on the run queue.  Then, if we're ready to
1152        * GC, check whether this is the last task to stop.  If so, wake
1153        * up the GC thread.  getThread will block during a GC until the
1154        * GC is finished.
1155        */
1156       IF_DEBUG(scheduler,
1157                if (t->what_next == ThreadEnterInterp) {
1158                    /* ToDo: or maybe a timer expired when we were in Hugs?
1159                     * or maybe someone hit ctrl-C
1160                     */
1161                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1162                          t->id, t, whatNext_strs[t->what_next]);
1163                } else {
1164                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1165                          t->id, t, whatNext_strs[t->what_next]);
1166                }
1167                );
1168
1169       threadPaused(t);
1170
1171       IF_DEBUG(sanity,
1172                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1173                checkTSO(t));
1174       ASSERT(t->link == END_TSO_QUEUE);
1175 #if defined(GRAN)
1176       ASSERT(!is_on_queue(t,CurrentProc));
1177
1178       IF_DEBUG(sanity,
1179                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1180                checkThreadQsSanity(rtsTrue));
1181 #endif
1182 #if defined(PAR)
1183       if (RtsFlags.ParFlags.doFairScheduling) { 
1184         /* this does round-robin scheduling; good for concurrency */
1185         APPEND_TO_RUN_QUEUE(t);
1186       } else {
1187         /* this does unfair scheduling; good for parallelism */
1188         PUSH_ON_RUN_QUEUE(t);
1189       }
1190 #else
1191       /* this does round-robin scheduling; good for concurrency */
1192       APPEND_TO_RUN_QUEUE(t);
1193 #endif
1194 #if defined(GRAN)
1195       /* add a ContinueThread event to actually process the thread */
1196       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1197                 ContinueThread,
1198                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1199       IF_GRAN_DEBUG(bq, 
1200                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1201                G_EVENTQ(0);
1202                G_CURR_THREADQ(0));
1203 #endif /* GRAN */
1204       break;
1205       
1206     case ThreadBlocked:
1207 #if defined(GRAN)
1208       IF_DEBUG(scheduler,
1209                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1210                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1211                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1212
1213       // ??? needed; should emit block before
1214       IF_DEBUG(gran, 
1215                DumpGranEvent(GR_DESCHEDULE, t)); 
1216       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1217       /*
1218         ngoq Dogh!
1219       ASSERT(procStatus[CurrentProc]==Busy || 
1220               ((procStatus[CurrentProc]==Fetching) && 
1221               (t->block_info.closure!=(StgClosure*)NULL)));
1222       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1223           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1224             procStatus[CurrentProc]==Fetching)) 
1225         procStatus[CurrentProc] = Idle;
1226       */
1227 #elif defined(PAR)
1228       IF_DEBUG(scheduler,
1229                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1230                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1231       IF_PAR_DEBUG(bq,
1232
1233                    if (t->block_info.closure!=(StgClosure*)NULL) 
1234                      print_bq(t->block_info.closure));
1235
1236       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1237       blockThread(t);
1238
1239       /* whatever we schedule next, we must log that schedule */
1240       emitSchedule = rtsTrue;
1241
1242 #else /* !GRAN */
1243       /* don't need to do anything.  Either the thread is blocked on
1244        * I/O, in which case we'll have called addToBlockedQueue
1245        * previously, or it's blocked on an MVar or Blackhole, in which
1246        * case it'll be on the relevant queue already.
1247        */
1248       IF_DEBUG(scheduler,
1249                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1250                printThreadBlockage(t);
1251                fprintf(stderr, "\n"));
1252
1253       /* Only for dumping event to log file 
1254          ToDo: do I need this in GranSim, too?
1255       blockThread(t);
1256       */
1257 #endif
1258       threadPaused(t);
1259       break;
1260       
1261     case ThreadFinished:
1262       /* Need to check whether this was a main thread, and if so, signal
1263        * the task that started it with the return value.  If we have no
1264        * more main threads, we probably need to stop all the tasks until
1265        * we get a new one.
1266        */
1267       /* We also end up here if the thread kills itself with an
1268        * uncaught exception, see Exception.hc.
1269        */
1270       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1271 #if defined(GRAN)
1272       endThread(t, CurrentProc); // clean-up the thread
1273 #elif defined(PAR)
1274       /* For now all are advisory -- HWL */
1275       //if(t->priority==AdvisoryPriority) ??
1276       advisory_thread_count--;
1277       
1278 # ifdef DIST
1279       if(t->dist.priority==RevalPriority)
1280         FinishReval(t);
1281 # endif
1282       
1283       if (RtsFlags.ParFlags.ParStats.Full &&
1284           !RtsFlags.ParFlags.ParStats.Suppressed) 
1285         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1286 #endif
1287       break;
1288       
1289     default:
1290       barf("schedule: invalid thread return code %d", (int)ret);
1291     }
1292     
1293 #if defined(RTS_SUPPORTS_THREADS)
1294     /* I don't understand what this re-grab is doing -- sof */
1295     grabCapability(&cap);
1296 #endif
1297
1298 #ifdef PROFILING
1299     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1300         GarbageCollect(GetRoots, rtsTrue);
1301         heapCensus();
1302         performHeapProfile = rtsFalse;
1303         ready_to_gc = rtsFalse; // we already GC'd
1304     }
1305 #endif
1306
1307     if (ready_to_gc 
1308 #ifdef SMP
1309         && allFreeCapabilities() 
1310 #endif
1311         ) {
1312       /* everybody back, start the GC.
1313        * Could do it in this thread, or signal a condition var
1314        * to do it in another thread.  Either way, we need to
1315        * broadcast on gc_pending_cond afterward.
1316        */
1317 #if defined(RTS_SUPPORTS_THREADS)
1318       IF_DEBUG(scheduler,sched_belch("doing GC"));
1319 #endif
1320       GarbageCollect(GetRoots,rtsFalse);
1321       ready_to_gc = rtsFalse;
1322 #ifdef SMP
1323       broadcastCondition(&gc_pending_cond);
1324 #endif
1325 #if defined(GRAN)
1326       /* add a ContinueThread event to continue execution of current thread */
1327       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1328                 ContinueThread,
1329                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1330       IF_GRAN_DEBUG(bq, 
1331                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1332                G_EVENTQ(0);
1333                G_CURR_THREADQ(0));
1334 #endif /* GRAN */
1335     }
1336
1337 #if defined(GRAN)
1338   next_thread:
1339     IF_GRAN_DEBUG(unused,
1340                   print_eventq(EventHd));
1341
1342     event = get_next_event();
1343 #elif defined(PAR)
1344   next_thread:
1345     /* ToDo: wait for next message to arrive rather than busy wait */
1346 #endif /* GRAN */
1347
1348   } /* end of while(1) */
1349
1350   IF_PAR_DEBUG(verbose,
1351                belch("== Leaving schedule() after having received Finish"));
1352 }
1353
1354 /* ---------------------------------------------------------------------------
1355  * deleteAllThreads():  kill all the live threads.
1356  *
1357  * This is used when we catch a user interrupt (^C), before performing
1358  * any necessary cleanups and running finalizers.
1359  * ------------------------------------------------------------------------- */
1360    
1361 void deleteAllThreads ( void )
1362 {
1363   StgTSO* t, *next;
1364   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1365   for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) {
1366       next = t->link;
1367       deleteThread(t);
1368   }
1369   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) {
1370       next = t->link;
1371       deleteThread(t);
1372   }
1373   for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) {
1374       next = t->link;
1375       deleteThread(t);
1376   }
1377   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1378   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1379   sleeping_queue = END_TSO_QUEUE;
1380 }
1381
1382 /* startThread and  insertThread are now in GranSim.c -- HWL */
1383
1384
1385 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1386 //@subsection Suspend and Resume
1387
1388 /* ---------------------------------------------------------------------------
1389  * Suspending & resuming Haskell threads.
1390  * 
1391  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1392  * its capability before calling the C function.  This allows another
1393  * task to pick up the capability and carry on running Haskell
1394  * threads.  It also means that if the C call blocks, it won't lock
1395  * the whole system.
1396  *
1397  * The Haskell thread making the C call is put to sleep for the
1398  * duration of the call, on the susepended_ccalling_threads queue.  We
1399  * give out a token to the task, which it can use to resume the thread
1400  * on return from the C function.
1401  * ------------------------------------------------------------------------- */
1402    
1403 StgInt
1404 suspendThread( StgRegTable *reg, 
1405                rtsBool concCall
1406 #if !defined(RTS_SUPPORTS_THREADS)
1407                STG_UNUSED
1408 #endif
1409                )
1410 {
1411   nat tok;
1412   Capability *cap;
1413
1414   /* assume that *reg is a pointer to the StgRegTable part
1415    * of a Capability.
1416    */
1417   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1418
1419   ACQUIRE_LOCK(&sched_mutex);
1420
1421   IF_DEBUG(scheduler,
1422            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1423
1424   threadPaused(cap->r.rCurrentTSO);
1425   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1426   suspended_ccalling_threads = cap->r.rCurrentTSO;
1427
1428 #if defined(RTS_SUPPORTS_THREADS)
1429   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1430 #endif
1431
1432   /* Use the thread ID as the token; it should be unique */
1433   tok = cap->r.rCurrentTSO->id;
1434
1435   /* Hand back capability */
1436   releaseCapability(cap);
1437   
1438 #if defined(RTS_SUPPORTS_THREADS)
1439   /* Preparing to leave the RTS, so ensure there's a native thread/task
1440      waiting to take over.
1441      
1442      ToDo: optimise this and only create a new task if there's a need
1443      for one (i.e., if there's only one Concurrent Haskell thread alive,
1444      there's no need to create a new task).
1445   */
1446   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1447   if (concCall) {
1448     startTask(taskStart);
1449   }
1450 #endif
1451
1452   /* Other threads _might_ be available for execution; signal this */
1453   THREAD_RUNNABLE();
1454   RELEASE_LOCK(&sched_mutex);
1455   return tok; 
1456 }
1457
1458 StgRegTable *
1459 resumeThread( StgInt tok,
1460               rtsBool concCall
1461 #if !defined(RTS_SUPPORTS_THREADS)
1462                STG_UNUSED
1463 #endif
1464               )
1465 {
1466   StgTSO *tso, **prev;
1467   Capability *cap;
1468
1469 #if defined(RTS_SUPPORTS_THREADS)
1470   /* Wait for permission to re-enter the RTS with the result. */
1471   if ( concCall ) {
1472     grabReturnCapability(&sched_mutex, &cap);
1473   } else {
1474     grabCapability(&cap);
1475   }
1476 #else
1477   grabCapability(&cap);
1478 #endif
1479
1480   /* Remove the thread off of the suspended list */
1481   prev = &suspended_ccalling_threads;
1482   for (tso = suspended_ccalling_threads; 
1483        tso != END_TSO_QUEUE; 
1484        prev = &tso->link, tso = tso->link) {
1485     if (tso->id == (StgThreadID)tok) {
1486       *prev = tso->link;
1487       break;
1488     }
1489   }
1490   if (tso == END_TSO_QUEUE) {
1491     barf("resumeThread: thread not found");
1492   }
1493   tso->link = END_TSO_QUEUE;
1494   /* Reset blocking status */
1495   tso->why_blocked  = NotBlocked;
1496
1497   RELEASE_LOCK(&sched_mutex);
1498
1499   cap->r.rCurrentTSO = tso;
1500   return &cap->r;
1501 }
1502
1503
1504 /* ---------------------------------------------------------------------------
1505  * Static functions
1506  * ------------------------------------------------------------------------ */
1507 static void unblockThread(StgTSO *tso);
1508
1509 /* ---------------------------------------------------------------------------
1510  * Comparing Thread ids.
1511  *
1512  * This is used from STG land in the implementation of the
1513  * instances of Eq/Ord for ThreadIds.
1514  * ------------------------------------------------------------------------ */
1515
1516 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1517
1518   StgThreadID id1 = tso1->id; 
1519   StgThreadID id2 = tso2->id;
1520  
1521   if (id1 < id2) return (-1);
1522   if (id1 > id2) return 1;
1523   return 0;
1524 }
1525
1526 /* ---------------------------------------------------------------------------
1527  * Fetching the ThreadID from an StgTSO.
1528  *
1529  * This is used in the implementation of Show for ThreadIds.
1530  * ------------------------------------------------------------------------ */
1531 int rts_getThreadId(const StgTSO *tso) 
1532 {
1533   return tso->id;
1534 }
1535
1536 /* ---------------------------------------------------------------------------
1537    Create a new thread.
1538
1539    The new thread starts with the given stack size.  Before the
1540    scheduler can run, however, this thread needs to have a closure
1541    (and possibly some arguments) pushed on its stack.  See
1542    pushClosure() in Schedule.h.
1543
1544    createGenThread() and createIOThread() (in SchedAPI.h) are
1545    convenient packaged versions of this function.
1546
1547    currently pri (priority) is only used in a GRAN setup -- HWL
1548    ------------------------------------------------------------------------ */
1549 //@cindex createThread
1550 #if defined(GRAN)
1551 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1552 StgTSO *
1553 createThread(nat stack_size, StgInt pri)
1554 {
1555   return createThread_(stack_size, rtsFalse, pri);
1556 }
1557
1558 static StgTSO *
1559 createThread_(nat size, rtsBool have_lock, StgInt pri)
1560 {
1561 #else
1562 StgTSO *
1563 createThread(nat stack_size)
1564 {
1565   return createThread_(stack_size, rtsFalse);
1566 }
1567
1568 static StgTSO *
1569 createThread_(nat size, rtsBool have_lock)
1570 {
1571 #endif
1572
1573     StgTSO *tso;
1574     nat stack_size;
1575
1576     /* First check whether we should create a thread at all */
1577 #if defined(PAR)
1578   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1579   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1580     threadsIgnored++;
1581     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1582           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1583     return END_TSO_QUEUE;
1584   }
1585   threadsCreated++;
1586 #endif
1587
1588 #if defined(GRAN)
1589   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1590 #endif
1591
1592   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1593
1594   /* catch ridiculously small stack sizes */
1595   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1596     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1597   }
1598
1599   stack_size = size - TSO_STRUCT_SIZEW;
1600
1601   tso = (StgTSO *)allocate(size);
1602   TICK_ALLOC_TSO(stack_size, 0);
1603
1604   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1605 #if defined(GRAN)
1606   SET_GRAN_HDR(tso, ThisPE);
1607 #endif
1608   tso->what_next     = ThreadEnterGHC;
1609
1610   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1611    * protect the increment operation on next_thread_id.
1612    * In future, we could use an atomic increment instead.
1613    */
1614   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1615   tso->id = next_thread_id++; 
1616   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1617
1618   tso->why_blocked  = NotBlocked;
1619   tso->blocked_exceptions = NULL;
1620
1621   tso->stack_size   = stack_size;
1622   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1623                               - TSO_STRUCT_SIZEW;
1624   tso->sp           = (P_)&(tso->stack) + stack_size;
1625
1626 #ifdef PROFILING
1627   tso->prof.CCCS = CCS_MAIN;
1628 #endif
1629
1630   /* put a stop frame on the stack */
1631   tso->sp -= sizeofW(StgStopFrame);
1632   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1633   tso->su = (StgUpdateFrame*)tso->sp;
1634
1635   // ToDo: check this
1636 #if defined(GRAN)
1637   tso->link = END_TSO_QUEUE;
1638   /* uses more flexible routine in GranSim */
1639   insertThread(tso, CurrentProc);
1640 #else
1641   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1642    * from its creation
1643    */
1644 #endif
1645
1646 #if defined(GRAN) 
1647   if (RtsFlags.GranFlags.GranSimStats.Full) 
1648     DumpGranEvent(GR_START,tso);
1649 #elif defined(PAR)
1650   if (RtsFlags.ParFlags.ParStats.Full) 
1651     DumpGranEvent(GR_STARTQ,tso);
1652   /* HACk to avoid SCHEDULE 
1653      LastTSO = tso; */
1654 #endif
1655
1656   /* Link the new thread on the global thread list.
1657    */
1658   tso->global_link = all_threads;
1659   all_threads = tso;
1660
1661 #if defined(DIST)
1662   tso->dist.priority = MandatoryPriority; //by default that is...
1663 #endif
1664
1665 #if defined(GRAN)
1666   tso->gran.pri = pri;
1667 # if defined(DEBUG)
1668   tso->gran.magic = TSO_MAGIC; // debugging only
1669 # endif
1670   tso->gran.sparkname   = 0;
1671   tso->gran.startedat   = CURRENT_TIME; 
1672   tso->gran.exported    = 0;
1673   tso->gran.basicblocks = 0;
1674   tso->gran.allocs      = 0;
1675   tso->gran.exectime    = 0;
1676   tso->gran.fetchtime   = 0;
1677   tso->gran.fetchcount  = 0;
1678   tso->gran.blocktime   = 0;
1679   tso->gran.blockcount  = 0;
1680   tso->gran.blockedat   = 0;
1681   tso->gran.globalsparks = 0;
1682   tso->gran.localsparks  = 0;
1683   if (RtsFlags.GranFlags.Light)
1684     tso->gran.clock  = Now; /* local clock */
1685   else
1686     tso->gran.clock  = 0;
1687
1688   IF_DEBUG(gran,printTSO(tso));
1689 #elif defined(PAR)
1690 # if defined(DEBUG)
1691   tso->par.magic = TSO_MAGIC; // debugging only
1692 # endif
1693   tso->par.sparkname   = 0;
1694   tso->par.startedat   = CURRENT_TIME; 
1695   tso->par.exported    = 0;
1696   tso->par.basicblocks = 0;
1697   tso->par.allocs      = 0;
1698   tso->par.exectime    = 0;
1699   tso->par.fetchtime   = 0;
1700   tso->par.fetchcount  = 0;
1701   tso->par.blocktime   = 0;
1702   tso->par.blockcount  = 0;
1703   tso->par.blockedat   = 0;
1704   tso->par.globalsparks = 0;
1705   tso->par.localsparks  = 0;
1706 #endif
1707
1708 #if defined(GRAN)
1709   globalGranStats.tot_threads_created++;
1710   globalGranStats.threads_created_on_PE[CurrentProc]++;
1711   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1712   globalGranStats.tot_sq_probes++;
1713 #elif defined(PAR)
1714   // collect parallel global statistics (currently done together with GC stats)
1715   if (RtsFlags.ParFlags.ParStats.Global &&
1716       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1717     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1718     globalParStats.tot_threads_created++;
1719   }
1720 #endif 
1721
1722 #if defined(GRAN)
1723   IF_GRAN_DEBUG(pri,
1724                 belch("==__ schedule: Created TSO %d (%p);",
1725                       CurrentProc, tso, tso->id));
1726 #elif defined(PAR)
1727     IF_PAR_DEBUG(verbose,
1728                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1729                        tso->id, tso, advisory_thread_count));
1730 #else
1731   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1732                                  tso->id, tso->stack_size));
1733 #endif    
1734   return tso;
1735 }
1736
1737 #if defined(PAR)
1738 /* RFP:
1739    all parallel thread creation calls should fall through the following routine.
1740 */
1741 StgTSO *
1742 createSparkThread(rtsSpark spark) 
1743 { StgTSO *tso;
1744   ASSERT(spark != (rtsSpark)NULL);
1745   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1746   { threadsIgnored++;
1747     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1748           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1749     return END_TSO_QUEUE;
1750   }
1751   else
1752   { threadsCreated++;
1753     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1754     if (tso==END_TSO_QUEUE)     
1755       barf("createSparkThread: Cannot create TSO");
1756 #if defined(DIST)
1757     tso->priority = AdvisoryPriority;
1758 #endif
1759     pushClosure(tso,spark);
1760     PUSH_ON_RUN_QUEUE(tso);
1761     advisory_thread_count++;    
1762   }
1763   return tso;
1764 }
1765 #endif
1766
1767 /*
1768   Turn a spark into a thread.
1769   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1770 */
1771 #if defined(PAR)
1772 //@cindex activateSpark
1773 StgTSO *
1774 activateSpark (rtsSpark spark) 
1775 {
1776   StgTSO *tso;
1777
1778   tso = createSparkThread(spark);
1779   if (RtsFlags.ParFlags.ParStats.Full) {   
1780     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1781     IF_PAR_DEBUG(verbose,
1782                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1783                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1784   }
1785   // ToDo: fwd info on local/global spark to thread -- HWL
1786   // tso->gran.exported =  spark->exported;
1787   // tso->gran.locked =   !spark->global;
1788   // tso->gran.sparkname = spark->name;
1789
1790   return tso;
1791 }
1792 #endif
1793
1794 /* ---------------------------------------------------------------------------
1795  * scheduleThread()
1796  *
1797  * scheduleThread puts a thread on the head of the runnable queue.
1798  * This will usually be done immediately after a thread is created.
1799  * The caller of scheduleThread must create the thread using e.g.
1800  * createThread and push an appropriate closure
1801  * on this thread's stack before the scheduler is invoked.
1802  * ------------------------------------------------------------------------ */
1803
1804 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1805
1806 void
1807 scheduleThread_(StgTSO *tso
1808                , rtsBool createTask
1809 #if !defined(THREADED_RTS)
1810                  STG_UNUSED
1811 #endif
1812               )
1813 {
1814   ACQUIRE_LOCK(&sched_mutex);
1815
1816   /* Put the new thread on the head of the runnable queue.  The caller
1817    * better push an appropriate closure on this thread's stack
1818    * beforehand.  In the SMP case, the thread may start running as
1819    * soon as we release the scheduler lock below.
1820    */
1821   PUSH_ON_RUN_QUEUE(tso);
1822 #if defined(THREADED_RTS)
1823   /* If main() is scheduling a thread, don't bother creating a 
1824    * new task.
1825    */
1826   if ( createTask ) {
1827     startTask(taskStart);
1828   }
1829 #endif
1830   THREAD_RUNNABLE();
1831
1832 #if 0
1833   IF_DEBUG(scheduler,printTSO(tso));
1834 #endif
1835   RELEASE_LOCK(&sched_mutex);
1836 }
1837
1838 void scheduleThread(StgTSO* tso)
1839 {
1840   return scheduleThread_(tso, rtsFalse);
1841 }
1842
1843 void scheduleExtThread(StgTSO* tso)
1844 {
1845   return scheduleThread_(tso, rtsTrue);
1846 }
1847
1848 /* ---------------------------------------------------------------------------
1849  * initScheduler()
1850  *
1851  * Initialise the scheduler.  This resets all the queues - if the
1852  * queues contained any threads, they'll be garbage collected at the
1853  * next pass.
1854  *
1855  * ------------------------------------------------------------------------ */
1856
1857 #ifdef SMP
1858 static void
1859 term_handler(int sig STG_UNUSED)
1860 {
1861   stat_workerStop();
1862   ACQUIRE_LOCK(&term_mutex);
1863   await_death--;
1864   RELEASE_LOCK(&term_mutex);
1865   shutdownThread();
1866 }
1867 #endif
1868
1869 void 
1870 initScheduler(void)
1871 {
1872 #if defined(GRAN)
1873   nat i;
1874
1875   for (i=0; i<=MAX_PROC; i++) {
1876     run_queue_hds[i]      = END_TSO_QUEUE;
1877     run_queue_tls[i]      = END_TSO_QUEUE;
1878     blocked_queue_hds[i]  = END_TSO_QUEUE;
1879     blocked_queue_tls[i]  = END_TSO_QUEUE;
1880     ccalling_threadss[i]  = END_TSO_QUEUE;
1881     sleeping_queue        = END_TSO_QUEUE;
1882   }
1883 #else
1884   run_queue_hd      = END_TSO_QUEUE;
1885   run_queue_tl      = END_TSO_QUEUE;
1886   blocked_queue_hd  = END_TSO_QUEUE;
1887   blocked_queue_tl  = END_TSO_QUEUE;
1888   sleeping_queue    = END_TSO_QUEUE;
1889 #endif 
1890
1891   suspended_ccalling_threads  = END_TSO_QUEUE;
1892
1893   main_threads = NULL;
1894   all_threads  = END_TSO_QUEUE;
1895
1896   context_switch = 0;
1897   interrupted    = 0;
1898
1899   RtsFlags.ConcFlags.ctxtSwitchTicks =
1900       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1901       
1902 #if defined(RTS_SUPPORTS_THREADS)
1903   /* Initialise the mutex and condition variables used by
1904    * the scheduler. */
1905   initMutex(&sched_mutex);
1906   initMutex(&term_mutex);
1907
1908   initCondition(&thread_ready_cond);
1909 #endif
1910   
1911 #if defined(SMP)
1912   initCondition(&gc_pending_cond);
1913 #endif
1914
1915 #if defined(RTS_SUPPORTS_THREADS)
1916   ACQUIRE_LOCK(&sched_mutex);
1917 #endif
1918
1919   /* Install the SIGHUP handler */
1920 #if defined(SMP)
1921   {
1922     struct sigaction action,oact;
1923
1924     action.sa_handler = term_handler;
1925     sigemptyset(&action.sa_mask);
1926     action.sa_flags = 0;
1927     if (sigaction(SIGTERM, &action, &oact) != 0) {
1928       barf("can't install TERM handler");
1929     }
1930   }
1931 #endif
1932
1933   /* A capability holds the state a native thread needs in
1934    * order to execute STG code. At least one capability is
1935    * floating around (only SMP builds have more than one).
1936    */
1937   initCapabilities();
1938   
1939 #if defined(RTS_SUPPORTS_THREADS)
1940     /* start our haskell execution tasks */
1941 # if defined(SMP)
1942     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
1943 # else
1944     startTaskManager(0,taskStart);
1945 # endif
1946 #endif
1947
1948 #if /* defined(SMP) ||*/ defined(PAR)
1949   initSparkPools();
1950 #endif
1951
1952 #if defined(RTS_SUPPORTS_THREADS)
1953   RELEASE_LOCK(&sched_mutex);
1954 #endif
1955
1956 }
1957
1958 void
1959 exitScheduler( void )
1960 {
1961 #if defined(RTS_SUPPORTS_THREADS)
1962   stopTaskManager();
1963 #endif
1964 }
1965
1966 /* -----------------------------------------------------------------------------
1967    Managing the per-task allocation areas.
1968    
1969    Each capability comes with an allocation area.  These are
1970    fixed-length block lists into which allocation can be done.
1971
1972    ToDo: no support for two-space collection at the moment???
1973    -------------------------------------------------------------------------- */
1974
1975 /* -----------------------------------------------------------------------------
1976  * waitThread is the external interface for running a new computation
1977  * and waiting for the result.
1978  *
1979  * In the non-SMP case, we create a new main thread, push it on the 
1980  * main-thread stack, and invoke the scheduler to run it.  The
1981  * scheduler will return when the top main thread on the stack has
1982  * completed or died, and fill in the necessary fields of the
1983  * main_thread structure.
1984  *
1985  * In the SMP case, we create a main thread as before, but we then
1986  * create a new condition variable and sleep on it.  When our new
1987  * main thread has completed, we'll be woken up and the status/result
1988  * will be in the main_thread struct.
1989  * -------------------------------------------------------------------------- */
1990
1991 int 
1992 howManyThreadsAvail ( void )
1993 {
1994    int i = 0;
1995    StgTSO* q;
1996    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1997       i++;
1998    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1999       i++;
2000    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2001       i++;
2002    return i;
2003 }
2004
2005 void
2006 finishAllThreads ( void )
2007 {
2008    do {
2009       while (run_queue_hd != END_TSO_QUEUE) {
2010          waitThread ( run_queue_hd, NULL);
2011       }
2012       while (blocked_queue_hd != END_TSO_QUEUE) {
2013          waitThread ( blocked_queue_hd, NULL);
2014       }
2015       while (sleeping_queue != END_TSO_QUEUE) {
2016          waitThread ( blocked_queue_hd, NULL);
2017       }
2018    } while 
2019       (blocked_queue_hd != END_TSO_QUEUE || 
2020        run_queue_hd     != END_TSO_QUEUE ||
2021        sleeping_queue   != END_TSO_QUEUE);
2022 }
2023
2024 SchedulerStatus
2025 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2026
2027 #if defined(THREADED_RTS)
2028   return waitThread_(tso,ret, rtsFalse);
2029 #else
2030   return waitThread_(tso,ret);
2031 #endif
2032 }
2033
2034 SchedulerStatus
2035 waitThread_(StgTSO *tso,
2036             /*out*/StgClosure **ret
2037 #if defined(THREADED_RTS)
2038             , rtsBool blockWaiting
2039 #endif
2040            )
2041 {
2042   StgMainThread *m;
2043   SchedulerStatus stat;
2044
2045   ACQUIRE_LOCK(&sched_mutex);
2046   
2047   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2048
2049   m->tso = tso;
2050   m->ret = ret;
2051   m->stat = NoStatus;
2052 #if defined(RTS_SUPPORTS_THREADS)
2053   initCondition(&m->wakeup);
2054 #endif
2055
2056   m->link = main_threads;
2057   main_threads = m;
2058
2059   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2060
2061 #if defined(RTS_SUPPORTS_THREADS)
2062
2063 # if defined(THREADED_RTS)
2064   if (!blockWaiting) {
2065     /* In the threaded case, the OS thread that called main()
2066      * gets to enter the RTS directly without going via another
2067      * task/thread.
2068      */
2069     RELEASE_LOCK(&sched_mutex);
2070     schedule();
2071     ASSERT(m->stat != NoStatus);
2072   } else 
2073 # endif
2074   {
2075     IF_DEBUG(scheduler, sched_belch("sfoo"));
2076     do {
2077       waitCondition(&m->wakeup, &sched_mutex);
2078     } while (m->stat == NoStatus);
2079   }
2080 #elif defined(GRAN)
2081   /* GranSim specific init */
2082   CurrentTSO = m->tso;                // the TSO to run
2083   procStatus[MainProc] = Busy;        // status of main PE
2084   CurrentProc = MainProc;             // PE to run it on
2085
2086   schedule();
2087 #else
2088   RELEASE_LOCK(&sched_mutex);
2089   schedule();
2090   ASSERT(m->stat != NoStatus);
2091 #endif
2092
2093   stat = m->stat;
2094
2095 #if defined(RTS_SUPPORTS_THREADS)
2096   closeCondition(&m->wakeup);
2097 #endif
2098
2099   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2100                               m->tso->id));
2101   free(m);
2102
2103 #if defined(THREADED_RTS)
2104   if (blockWaiting) 
2105 #endif
2106     RELEASE_LOCK(&sched_mutex);
2107
2108   return stat;
2109 }
2110
2111 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2112 //@subsection Run queue code 
2113
2114 #if 0
2115 /* 
2116    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2117        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2118        implicit global variable that has to be correct when calling these
2119        fcts -- HWL 
2120 */
2121
2122 /* Put the new thread on the head of the runnable queue.
2123  * The caller of createThread better push an appropriate closure
2124  * on this thread's stack before the scheduler is invoked.
2125  */
2126 static /* inline */ void
2127 add_to_run_queue(tso)
2128 StgTSO* tso; 
2129 {
2130   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2131   tso->link = run_queue_hd;
2132   run_queue_hd = tso;
2133   if (run_queue_tl == END_TSO_QUEUE) {
2134     run_queue_tl = tso;
2135   }
2136 }
2137
2138 /* Put the new thread at the end of the runnable queue. */
2139 static /* inline */ void
2140 push_on_run_queue(tso)
2141 StgTSO* tso; 
2142 {
2143   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2144   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2145   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2146   if (run_queue_hd == END_TSO_QUEUE) {
2147     run_queue_hd = tso;
2148   } else {
2149     run_queue_tl->link = tso;
2150   }
2151   run_queue_tl = tso;
2152 }
2153
2154 /* 
2155    Should be inlined because it's used very often in schedule.  The tso
2156    argument is actually only needed in GranSim, where we want to have the
2157    possibility to schedule *any* TSO on the run queue, irrespective of the
2158    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2159    the run queue and dequeue the tso, adjusting the links in the queue. 
2160 */
2161 //@cindex take_off_run_queue
2162 static /* inline */ StgTSO*
2163 take_off_run_queue(StgTSO *tso) {
2164   StgTSO *t, *prev;
2165
2166   /* 
2167      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2168
2169      if tso is specified, unlink that tso from the run_queue (doesn't have
2170      to be at the beginning of the queue); GranSim only 
2171   */
2172   if (tso!=END_TSO_QUEUE) {
2173     /* find tso in queue */
2174     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2175          t!=END_TSO_QUEUE && t!=tso;
2176          prev=t, t=t->link) 
2177       /* nothing */ ;
2178     ASSERT(t==tso);
2179     /* now actually dequeue the tso */
2180     if (prev!=END_TSO_QUEUE) {
2181       ASSERT(run_queue_hd!=t);
2182       prev->link = t->link;
2183     } else {
2184       /* t is at beginning of thread queue */
2185       ASSERT(run_queue_hd==t);
2186       run_queue_hd = t->link;
2187     }
2188     /* t is at end of thread queue */
2189     if (t->link==END_TSO_QUEUE) {
2190       ASSERT(t==run_queue_tl);
2191       run_queue_tl = prev;
2192     } else {
2193       ASSERT(run_queue_tl!=t);
2194     }
2195     t->link = END_TSO_QUEUE;
2196   } else {
2197     /* take tso from the beginning of the queue; std concurrent code */
2198     t = run_queue_hd;
2199     if (t != END_TSO_QUEUE) {
2200       run_queue_hd = t->link;
2201       t->link = END_TSO_QUEUE;
2202       if (run_queue_hd == END_TSO_QUEUE) {
2203         run_queue_tl = END_TSO_QUEUE;
2204       }
2205     }
2206   }
2207   return t;
2208 }
2209
2210 #endif /* 0 */
2211
2212 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2213 //@subsection Garbage Collextion Routines
2214
2215 /* ---------------------------------------------------------------------------
2216    Where are the roots that we know about?
2217
2218         - all the threads on the runnable queue
2219         - all the threads on the blocked queue
2220         - all the threads on the sleeping queue
2221         - all the thread currently executing a _ccall_GC
2222         - all the "main threads"
2223      
2224    ------------------------------------------------------------------------ */
2225
2226 /* This has to be protected either by the scheduler monitor, or by the
2227         garbage collection monitor (probably the latter).
2228         KH @ 25/10/99
2229 */
2230
2231 void
2232 GetRoots(evac_fn evac)
2233 {
2234   StgMainThread *m;
2235
2236 #if defined(GRAN)
2237   {
2238     nat i;
2239     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2240       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2241           evac((StgClosure **)&run_queue_hds[i]);
2242       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2243           evac((StgClosure **)&run_queue_tls[i]);
2244       
2245       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2246           evac((StgClosure **)&blocked_queue_hds[i]);
2247       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2248           evac((StgClosure **)&blocked_queue_tls[i]);
2249       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2250           evac((StgClosure **)&ccalling_threads[i]);
2251     }
2252   }
2253
2254   markEventQueue();
2255
2256 #else /* !GRAN */
2257   if (run_queue_hd != END_TSO_QUEUE) {
2258       ASSERT(run_queue_tl != END_TSO_QUEUE);
2259       evac((StgClosure **)&run_queue_hd);
2260       evac((StgClosure **)&run_queue_tl);
2261   }
2262   
2263   if (blocked_queue_hd != END_TSO_QUEUE) {
2264       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2265       evac((StgClosure **)&blocked_queue_hd);
2266       evac((StgClosure **)&blocked_queue_tl);
2267   }
2268   
2269   if (sleeping_queue != END_TSO_QUEUE) {
2270       evac((StgClosure **)&sleeping_queue);
2271   }
2272 #endif 
2273
2274   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2275       evac((StgClosure **)&suspended_ccalling_threads);
2276   }
2277
2278 #if defined(PAR) || defined(GRAN)
2279   markSparkQueue(evac);
2280 #endif
2281 }
2282
2283 /* -----------------------------------------------------------------------------
2284    performGC
2285
2286    This is the interface to the garbage collector from Haskell land.
2287    We provide this so that external C code can allocate and garbage
2288    collect when called from Haskell via _ccall_GC.
2289
2290    It might be useful to provide an interface whereby the programmer
2291    can specify more roots (ToDo).
2292    
2293    This needs to be protected by the GC condition variable above.  KH.
2294    -------------------------------------------------------------------------- */
2295
2296 void (*extra_roots)(evac_fn);
2297
2298 void
2299 performGC(void)
2300 {
2301   /* Obligated to hold this lock upon entry */
2302   ACQUIRE_LOCK(&sched_mutex);
2303   GarbageCollect(GetRoots,rtsFalse);
2304   RELEASE_LOCK(&sched_mutex);
2305 }
2306
2307 void
2308 performMajorGC(void)
2309 {
2310   ACQUIRE_LOCK(&sched_mutex);
2311   GarbageCollect(GetRoots,rtsTrue);
2312   RELEASE_LOCK(&sched_mutex);
2313 }
2314
2315 static void
2316 AllRoots(evac_fn evac)
2317 {
2318     GetRoots(evac);             // the scheduler's roots
2319     extra_roots(evac);          // the user's roots
2320 }
2321
2322 void
2323 performGCWithRoots(void (*get_roots)(evac_fn))
2324 {
2325   ACQUIRE_LOCK(&sched_mutex);
2326   extra_roots = get_roots;
2327   GarbageCollect(AllRoots,rtsFalse);
2328   RELEASE_LOCK(&sched_mutex);
2329 }
2330
2331 /* -----------------------------------------------------------------------------
2332    Stack overflow
2333
2334    If the thread has reached its maximum stack size, then raise the
2335    StackOverflow exception in the offending thread.  Otherwise
2336    relocate the TSO into a larger chunk of memory and adjust its stack
2337    size appropriately.
2338    -------------------------------------------------------------------------- */
2339
2340 static StgTSO *
2341 threadStackOverflow(StgTSO *tso)
2342 {
2343   nat new_stack_size, new_tso_size, diff, stack_words;
2344   StgPtr new_sp;
2345   StgTSO *dest;
2346
2347   IF_DEBUG(sanity,checkTSO(tso));
2348   if (tso->stack_size >= tso->max_stack_size) {
2349
2350     IF_DEBUG(gc,
2351              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2352                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2353              /* If we're debugging, just print out the top of the stack */
2354              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2355                                               tso->sp+64)));
2356
2357     /* Send this thread the StackOverflow exception */
2358     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2359     return tso;
2360   }
2361
2362   /* Try to double the current stack size.  If that takes us over the
2363    * maximum stack size for this thread, then use the maximum instead.
2364    * Finally round up so the TSO ends up as a whole number of blocks.
2365    */
2366   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2367   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2368                                        TSO_STRUCT_SIZE)/sizeof(W_);
2369   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2370   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2371
2372   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2373
2374   dest = (StgTSO *)allocate(new_tso_size);
2375   TICK_ALLOC_TSO(new_stack_size,0);
2376
2377   /* copy the TSO block and the old stack into the new area */
2378   memcpy(dest,tso,TSO_STRUCT_SIZE);
2379   stack_words = tso->stack + tso->stack_size - tso->sp;
2380   new_sp = (P_)dest + new_tso_size - stack_words;
2381   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2382
2383   /* relocate the stack pointers... */
2384   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2385   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2386   dest->sp    = new_sp;
2387   dest->stack_size = new_stack_size;
2388         
2389   /* and relocate the update frame list */
2390   relocate_stack(dest, diff);
2391
2392   /* Mark the old TSO as relocated.  We have to check for relocated
2393    * TSOs in the garbage collector and any primops that deal with TSOs.
2394    *
2395    * It's important to set the sp and su values to just beyond the end
2396    * of the stack, so we don't attempt to scavenge any part of the
2397    * dead TSO's stack.
2398    */
2399   tso->what_next = ThreadRelocated;
2400   tso->link = dest;
2401   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2402   tso->su = (StgUpdateFrame *)tso->sp;
2403   tso->why_blocked = NotBlocked;
2404   dest->mut_link = NULL;
2405
2406   IF_PAR_DEBUG(verbose,
2407                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2408                      tso->id, tso, tso->stack_size);
2409                /* If we're debugging, just print out the top of the stack */
2410                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2411                                                 tso->sp+64)));
2412   
2413   IF_DEBUG(sanity,checkTSO(tso));
2414 #if 0
2415   IF_DEBUG(scheduler,printTSO(dest));
2416 #endif
2417
2418   return dest;
2419 }
2420
2421 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2422 //@subsection Blocking Queue Routines
2423
2424 /* ---------------------------------------------------------------------------
2425    Wake up a queue that was blocked on some resource.
2426    ------------------------------------------------------------------------ */
2427
2428 #if defined(GRAN)
2429 static inline void
2430 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2431 {
2432 }
2433 #elif defined(PAR)
2434 static inline void
2435 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2436 {
2437   /* write RESUME events to log file and
2438      update blocked and fetch time (depending on type of the orig closure) */
2439   if (RtsFlags.ParFlags.ParStats.Full) {
2440     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2441                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2442                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2443     if (EMPTY_RUN_QUEUE())
2444       emitSchedule = rtsTrue;
2445
2446     switch (get_itbl(node)->type) {
2447         case FETCH_ME_BQ:
2448           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2449           break;
2450         case RBH:
2451         case FETCH_ME:
2452         case BLACKHOLE_BQ:
2453           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2454           break;
2455 #ifdef DIST
2456         case MVAR:
2457           break;
2458 #endif    
2459         default:
2460           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2461         }
2462       }
2463 }
2464 #endif
2465
2466 #if defined(GRAN)
2467 static StgBlockingQueueElement *
2468 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2469 {
2470     StgTSO *tso;
2471     PEs node_loc, tso_loc;
2472
2473     node_loc = where_is(node); // should be lifted out of loop
2474     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2475     tso_loc = where_is((StgClosure *)tso);
2476     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2477       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2478       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2479       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2480       // insertThread(tso, node_loc);
2481       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2482                 ResumeThread,
2483                 tso, node, (rtsSpark*)NULL);
2484       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2485       // len_local++;
2486       // len++;
2487     } else { // TSO is remote (actually should be FMBQ)
2488       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2489                                   RtsFlags.GranFlags.Costs.gunblocktime +
2490                                   RtsFlags.GranFlags.Costs.latency;
2491       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2492                 UnblockThread,
2493                 tso, node, (rtsSpark*)NULL);
2494       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2495       // len++;
2496     }
2497     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2498     IF_GRAN_DEBUG(bq,
2499                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2500                           (node_loc==tso_loc ? "Local" : "Global"), 
2501                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2502     tso->block_info.closure = NULL;
2503     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2504                              tso->id, tso));
2505 }
2506 #elif defined(PAR)
2507 static StgBlockingQueueElement *
2508 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2509 {
2510     StgBlockingQueueElement *next;
2511
2512     switch (get_itbl(bqe)->type) {
2513     case TSO:
2514       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2515       /* if it's a TSO just push it onto the run_queue */
2516       next = bqe->link;
2517       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2518       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2519       THREAD_RUNNABLE();
2520       unblockCount(bqe, node);
2521       /* reset blocking status after dumping event */
2522       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2523       break;
2524
2525     case BLOCKED_FETCH:
2526       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2527       next = bqe->link;
2528       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2529       PendingFetches = (StgBlockedFetch *)bqe;
2530       break;
2531
2532 # if defined(DEBUG)
2533       /* can ignore this case in a non-debugging setup; 
2534          see comments on RBHSave closures above */
2535     case CONSTR:
2536       /* check that the closure is an RBHSave closure */
2537       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2538              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2539              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2540       break;
2541
2542     default:
2543       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2544            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2545            (StgClosure *)bqe);
2546 # endif
2547     }
2548   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2549   return next;
2550 }
2551
2552 #else /* !GRAN && !PAR */
2553 static StgTSO *
2554 unblockOneLocked(StgTSO *tso)
2555 {
2556   StgTSO *next;
2557
2558   ASSERT(get_itbl(tso)->type == TSO);
2559   ASSERT(tso->why_blocked != NotBlocked);
2560   tso->why_blocked = NotBlocked;
2561   next = tso->link;
2562   PUSH_ON_RUN_QUEUE(tso);
2563   THREAD_RUNNABLE();
2564   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2565   return next;
2566 }
2567 #endif
2568
2569 #if defined(GRAN) || defined(PAR)
2570 inline StgBlockingQueueElement *
2571 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2572 {
2573   ACQUIRE_LOCK(&sched_mutex);
2574   bqe = unblockOneLocked(bqe, node);
2575   RELEASE_LOCK(&sched_mutex);
2576   return bqe;
2577 }
2578 #else
2579 inline StgTSO *
2580 unblockOne(StgTSO *tso)
2581 {
2582   ACQUIRE_LOCK(&sched_mutex);
2583   tso = unblockOneLocked(tso);
2584   RELEASE_LOCK(&sched_mutex);
2585   return tso;
2586 }
2587 #endif
2588
2589 #if defined(GRAN)
2590 void 
2591 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2592 {
2593   StgBlockingQueueElement *bqe;
2594   PEs node_loc;
2595   nat len = 0; 
2596
2597   IF_GRAN_DEBUG(bq, 
2598                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2599                       node, CurrentProc, CurrentTime[CurrentProc], 
2600                       CurrentTSO->id, CurrentTSO));
2601
2602   node_loc = where_is(node);
2603
2604   ASSERT(q == END_BQ_QUEUE ||
2605          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2606          get_itbl(q)->type == CONSTR); // closure (type constructor)
2607   ASSERT(is_unique(node));
2608
2609   /* FAKE FETCH: magically copy the node to the tso's proc;
2610      no Fetch necessary because in reality the node should not have been 
2611      moved to the other PE in the first place
2612   */
2613   if (CurrentProc!=node_loc) {
2614     IF_GRAN_DEBUG(bq, 
2615                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2616                         node, node_loc, CurrentProc, CurrentTSO->id, 
2617                         // CurrentTSO, where_is(CurrentTSO),
2618                         node->header.gran.procs));
2619     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2620     IF_GRAN_DEBUG(bq, 
2621                   belch("## new bitmask of node %p is %#x",
2622                         node, node->header.gran.procs));
2623     if (RtsFlags.GranFlags.GranSimStats.Global) {
2624       globalGranStats.tot_fake_fetches++;
2625     }
2626   }
2627
2628   bqe = q;
2629   // ToDo: check: ASSERT(CurrentProc==node_loc);
2630   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2631     //next = bqe->link;
2632     /* 
2633        bqe points to the current element in the queue
2634        next points to the next element in the queue
2635     */
2636     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2637     //tso_loc = where_is(tso);
2638     len++;
2639     bqe = unblockOneLocked(bqe, node);
2640   }
2641
2642   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2643      the closure to make room for the anchor of the BQ */
2644   if (bqe!=END_BQ_QUEUE) {
2645     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2646     /*
2647     ASSERT((info_ptr==&RBH_Save_0_info) ||
2648            (info_ptr==&RBH_Save_1_info) ||
2649            (info_ptr==&RBH_Save_2_info));
2650     */
2651     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2652     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2653     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2654
2655     IF_GRAN_DEBUG(bq,
2656                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2657                         node, info_type(node)));
2658   }
2659
2660   /* statistics gathering */
2661   if (RtsFlags.GranFlags.GranSimStats.Global) {
2662     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2663     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2664     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2665     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2666   }
2667   IF_GRAN_DEBUG(bq,
2668                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2669                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2670 }
2671 #elif defined(PAR)
2672 void 
2673 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2674 {
2675   StgBlockingQueueElement *bqe;
2676
2677   ACQUIRE_LOCK(&sched_mutex);
2678
2679   IF_PAR_DEBUG(verbose, 
2680                belch("##-_ AwBQ for node %p on [%x]: ",
2681                      node, mytid));
2682 #ifdef DIST  
2683   //RFP
2684   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2685     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2686     return;
2687   }
2688 #endif
2689   
2690   ASSERT(q == END_BQ_QUEUE ||
2691          get_itbl(q)->type == TSO ||           
2692          get_itbl(q)->type == BLOCKED_FETCH || 
2693          get_itbl(q)->type == CONSTR); 
2694
2695   bqe = q;
2696   while (get_itbl(bqe)->type==TSO || 
2697          get_itbl(bqe)->type==BLOCKED_FETCH) {
2698     bqe = unblockOneLocked(bqe, node);
2699   }
2700   RELEASE_LOCK(&sched_mutex);
2701 }
2702
2703 #else   /* !GRAN && !PAR */
2704 void
2705 awakenBlockedQueue(StgTSO *tso)
2706 {
2707   ACQUIRE_LOCK(&sched_mutex);
2708   while (tso != END_TSO_QUEUE) {
2709     tso = unblockOneLocked(tso);
2710   }
2711   RELEASE_LOCK(&sched_mutex);
2712 }
2713 #endif
2714
2715 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2716 //@subsection Exception Handling Routines
2717
2718 /* ---------------------------------------------------------------------------
2719    Interrupt execution
2720    - usually called inside a signal handler so it mustn't do anything fancy.   
2721    ------------------------------------------------------------------------ */
2722
2723 void
2724 interruptStgRts(void)
2725 {
2726     interrupted    = 1;
2727     context_switch = 1;
2728 }
2729
2730 /* -----------------------------------------------------------------------------
2731    Unblock a thread
2732
2733    This is for use when we raise an exception in another thread, which
2734    may be blocked.
2735    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2736    -------------------------------------------------------------------------- */
2737
2738 #if defined(GRAN) || defined(PAR)
2739 /*
2740   NB: only the type of the blocking queue is different in GranSim and GUM
2741       the operations on the queue-elements are the same
2742       long live polymorphism!
2743 */
2744 static void
2745 unblockThread(StgTSO *tso)
2746 {
2747   StgBlockingQueueElement *t, **last;
2748
2749   ACQUIRE_LOCK(&sched_mutex);
2750   switch (tso->why_blocked) {
2751
2752   case NotBlocked:
2753     return;  /* not blocked */
2754
2755   case BlockedOnMVar:
2756     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2757     {
2758       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2759       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2760
2761       last = (StgBlockingQueueElement **)&mvar->head;
2762       for (t = (StgBlockingQueueElement *)mvar->head; 
2763            t != END_BQ_QUEUE; 
2764            last = &t->link, last_tso = t, t = t->link) {
2765         if (t == (StgBlockingQueueElement *)tso) {
2766           *last = (StgBlockingQueueElement *)tso->link;
2767           if (mvar->tail == tso) {
2768             mvar->tail = (StgTSO *)last_tso;
2769           }
2770           goto done;
2771         }
2772       }
2773       barf("unblockThread (MVAR): TSO not found");
2774     }
2775
2776   case BlockedOnBlackHole:
2777     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2778     {
2779       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2780
2781       last = &bq->blocking_queue;
2782       for (t = bq->blocking_queue; 
2783            t != END_BQ_QUEUE; 
2784            last = &t->link, t = t->link) {
2785         if (t == (StgBlockingQueueElement *)tso) {
2786           *last = (StgBlockingQueueElement *)tso->link;
2787           goto done;
2788         }
2789       }
2790       barf("unblockThread (BLACKHOLE): TSO not found");
2791     }
2792
2793   case BlockedOnException:
2794     {
2795       StgTSO *target  = tso->block_info.tso;
2796
2797       ASSERT(get_itbl(target)->type == TSO);
2798
2799       if (target->what_next == ThreadRelocated) {
2800           target = target->link;
2801           ASSERT(get_itbl(target)->type == TSO);
2802       }
2803
2804       ASSERT(target->blocked_exceptions != NULL);
2805
2806       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2807       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2808            t != END_BQ_QUEUE; 
2809            last = &t->link, t = t->link) {
2810         ASSERT(get_itbl(t)->type == TSO);
2811         if (t == (StgBlockingQueueElement *)tso) {
2812           *last = (StgBlockingQueueElement *)tso->link;
2813           goto done;
2814         }
2815       }
2816       barf("unblockThread (Exception): TSO not found");
2817     }
2818
2819   case BlockedOnRead:
2820   case BlockedOnWrite:
2821     {
2822       /* take TSO off blocked_queue */
2823       StgBlockingQueueElement *prev = NULL;
2824       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2825            prev = t, t = t->link) {
2826         if (t == (StgBlockingQueueElement *)tso) {
2827           if (prev == NULL) {
2828             blocked_queue_hd = (StgTSO *)t->link;
2829             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2830               blocked_queue_tl = END_TSO_QUEUE;
2831             }
2832           } else {
2833             prev->link = t->link;
2834             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2835               blocked_queue_tl = (StgTSO *)prev;
2836             }
2837           }
2838           goto done;
2839         }
2840       }
2841       barf("unblockThread (I/O): TSO not found");
2842     }
2843
2844   case BlockedOnDelay:
2845     {
2846       /* take TSO off sleeping_queue */
2847       StgBlockingQueueElement *prev = NULL;
2848       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2849            prev = t, t = t->link) {
2850         if (t == (StgBlockingQueueElement *)tso) {
2851           if (prev == NULL) {
2852             sleeping_queue = (StgTSO *)t->link;
2853           } else {
2854             prev->link = t->link;
2855           }
2856           goto done;
2857         }
2858       }
2859       barf("unblockThread (I/O): TSO not found");
2860     }
2861
2862   default:
2863     barf("unblockThread");
2864   }
2865
2866  done:
2867   tso->link = END_TSO_QUEUE;
2868   tso->why_blocked = NotBlocked;
2869   tso->block_info.closure = NULL;
2870   PUSH_ON_RUN_QUEUE(tso);
2871   RELEASE_LOCK(&sched_mutex);
2872 }
2873 #else
2874 static void
2875 unblockThread(StgTSO *tso)
2876 {
2877   StgTSO *t, **last;
2878
2879   ACQUIRE_LOCK(&sched_mutex);
2880   switch (tso->why_blocked) {
2881
2882   case NotBlocked:
2883     return;  /* not blocked */
2884
2885   case BlockedOnMVar:
2886     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2887     {
2888       StgTSO *last_tso = END_TSO_QUEUE;
2889       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2890
2891       last = &mvar->head;
2892       for (t = mvar->head; t != END_TSO_QUEUE; 
2893            last = &t->link, last_tso = t, t = t->link) {
2894         if (t == tso) {
2895           *last = tso->link;
2896           if (mvar->tail == tso) {
2897             mvar->tail = last_tso;
2898           }
2899           goto done;
2900         }
2901       }
2902       barf("unblockThread (MVAR): TSO not found");
2903     }
2904
2905   case BlockedOnBlackHole:
2906     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2907     {
2908       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2909
2910       last = &bq->blocking_queue;
2911       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2912            last = &t->link, t = t->link) {
2913         if (t == tso) {
2914           *last = tso->link;
2915           goto done;
2916         }
2917       }
2918       barf("unblockThread (BLACKHOLE): TSO not found");
2919     }
2920
2921   case BlockedOnException:
2922     {
2923       StgTSO *target  = tso->block_info.tso;
2924
2925       ASSERT(get_itbl(target)->type == TSO);
2926
2927       while (target->what_next == ThreadRelocated) {
2928           target = target->link;
2929           ASSERT(get_itbl(target)->type == TSO);
2930       }
2931       
2932       ASSERT(target->blocked_exceptions != NULL);
2933
2934       last = &target->blocked_exceptions;
2935       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2936            last = &t->link, t = t->link) {
2937         ASSERT(get_itbl(t)->type == TSO);
2938         if (t == tso) {
2939           *last = tso->link;
2940           goto done;
2941         }
2942       }
2943       barf("unblockThread (Exception): TSO not found");
2944     }
2945
2946   case BlockedOnRead:
2947   case BlockedOnWrite:
2948     {
2949       StgTSO *prev = NULL;
2950       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2951            prev = t, t = t->link) {
2952         if (t == tso) {
2953           if (prev == NULL) {
2954             blocked_queue_hd = t->link;
2955             if (blocked_queue_tl == t) {
2956               blocked_queue_tl = END_TSO_QUEUE;
2957             }
2958           } else {
2959             prev->link = t->link;
2960             if (blocked_queue_tl == t) {
2961               blocked_queue_tl = prev;
2962             }
2963           }
2964           goto done;
2965         }
2966       }
2967       barf("unblockThread (I/O): TSO not found");
2968     }
2969
2970   case BlockedOnDelay:
2971     {
2972       StgTSO *prev = NULL;
2973       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2974            prev = t, t = t->link) {
2975         if (t == tso) {
2976           if (prev == NULL) {
2977             sleeping_queue = t->link;
2978           } else {
2979             prev->link = t->link;
2980           }
2981           goto done;
2982         }
2983       }
2984       barf("unblockThread (I/O): TSO not found");
2985     }
2986
2987   default:
2988     barf("unblockThread");
2989   }
2990
2991  done:
2992   tso->link = END_TSO_QUEUE;
2993   tso->why_blocked = NotBlocked;
2994   tso->block_info.closure = NULL;
2995   PUSH_ON_RUN_QUEUE(tso);
2996   RELEASE_LOCK(&sched_mutex);
2997 }
2998 #endif
2999
3000 /* -----------------------------------------------------------------------------
3001  * raiseAsync()
3002  *
3003  * The following function implements the magic for raising an
3004  * asynchronous exception in an existing thread.
3005  *
3006  * We first remove the thread from any queue on which it might be
3007  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3008  *
3009  * We strip the stack down to the innermost CATCH_FRAME, building
3010  * thunks in the heap for all the active computations, so they can 
3011  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3012  * an application of the handler to the exception, and push it on
3013  * the top of the stack.
3014  * 
3015  * How exactly do we save all the active computations?  We create an
3016  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3017  * AP_UPDs pushes everything from the corresponding update frame
3018  * upwards onto the stack.  (Actually, it pushes everything up to the
3019  * next update frame plus a pointer to the next AP_UPD object.
3020  * Entering the next AP_UPD object pushes more onto the stack until we
3021  * reach the last AP_UPD object - at which point the stack should look
3022  * exactly as it did when we killed the TSO and we can continue
3023  * execution by entering the closure on top of the stack.
3024  *
3025  * We can also kill a thread entirely - this happens if either (a) the 
3026  * exception passed to raiseAsync is NULL, or (b) there's no
3027  * CATCH_FRAME on the stack.  In either case, we strip the entire
3028  * stack and replace the thread with a zombie.
3029  *
3030  * -------------------------------------------------------------------------- */
3031  
3032 void 
3033 deleteThread(StgTSO *tso)
3034 {
3035   raiseAsync(tso,NULL);
3036 }
3037
3038 void
3039 raiseAsync(StgTSO *tso, StgClosure *exception)
3040 {
3041   StgUpdateFrame* su = tso->su;
3042   StgPtr          sp = tso->sp;
3043   
3044   /* Thread already dead? */
3045   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3046     return;
3047   }
3048
3049   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3050
3051   /* Remove it from any blocking queues */
3052   unblockThread(tso);
3053
3054   /* The stack freezing code assumes there's a closure pointer on
3055    * the top of the stack.  This isn't always the case with compiled
3056    * code, so we have to push a dummy closure on the top which just
3057    * returns to the next return address on the stack.
3058    */
3059   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3060     *(--sp) = (W_)&stg_dummy_ret_closure;
3061   }
3062
3063   while (1) {
3064     nat words = ((P_)su - (P_)sp) - 1;
3065     nat i;
3066     StgAP_UPD * ap;
3067
3068     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3069      * then build the THUNK raise(exception), and leave it on
3070      * top of the CATCH_FRAME ready to enter.
3071      */
3072     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3073       StgCatchFrame *cf = (StgCatchFrame *)su;
3074       StgClosure *raise;
3075
3076       /* we've got an exception to raise, so let's pass it to the
3077        * handler in this frame.
3078        */
3079       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3080       TICK_ALLOC_SE_THK(1,0);
3081       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3082       raise->payload[0] = exception;
3083
3084       /* throw away the stack from Sp up to the CATCH_FRAME.
3085        */
3086       sp = (P_)su - 1;
3087
3088       /* Ensure that async excpetions are blocked now, so we don't get
3089        * a surprise exception before we get around to executing the
3090        * handler.
3091        */
3092       if (tso->blocked_exceptions == NULL) {
3093           tso->blocked_exceptions = END_TSO_QUEUE;
3094       }
3095
3096       /* Put the newly-built THUNK on top of the stack, ready to execute
3097        * when the thread restarts.
3098        */
3099       sp[0] = (W_)raise;
3100       tso->sp = sp;
3101       tso->su = su;
3102       tso->what_next = ThreadEnterGHC;
3103       IF_DEBUG(sanity, checkTSO(tso));
3104       return;
3105     }
3106
3107     /* First build an AP_UPD consisting of the stack chunk above the
3108      * current update frame, with the top word on the stack as the
3109      * fun field.
3110      */
3111     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3112     
3113     ASSERT(words >= 0);
3114     
3115     ap->n_args = words;
3116     ap->fun    = (StgClosure *)sp[0];
3117     sp++;
3118     for(i=0; i < (nat)words; ++i) {
3119       ap->payload[i] = (StgClosure *)*sp++;
3120     }
3121     
3122     switch (get_itbl(su)->type) {
3123       
3124     case UPDATE_FRAME:
3125       {
3126         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3127         TICK_ALLOC_UP_THK(words+1,0);
3128         
3129         IF_DEBUG(scheduler,
3130                  fprintf(stderr,  "scheduler: Updating ");
3131                  printPtr((P_)su->updatee); 
3132                  fprintf(stderr,  " with ");
3133                  printObj((StgClosure *)ap);
3134                  );
3135         
3136         /* Replace the updatee with an indirection - happily
3137          * this will also wake up any threads currently
3138          * waiting on the result.
3139          *
3140          * Warning: if we're in a loop, more than one update frame on
3141          * the stack may point to the same object.  Be careful not to
3142          * overwrite an IND_OLDGEN in this case, because we'll screw
3143          * up the mutable lists.  To be on the safe side, don't
3144          * overwrite any kind of indirection at all.  See also
3145          * threadSqueezeStack in GC.c, where we have to make a similar
3146          * check.
3147          */
3148         if (!closure_IND(su->updatee)) {
3149             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3150         }
3151         su = su->link;
3152         sp += sizeofW(StgUpdateFrame) -1;
3153         sp[0] = (W_)ap; /* push onto stack */
3154         break;
3155       }
3156
3157     case CATCH_FRAME:
3158       {
3159         StgCatchFrame *cf = (StgCatchFrame *)su;
3160         StgClosure* o;
3161         
3162         /* We want a PAP, not an AP_UPD.  Fortunately, the
3163          * layout's the same.
3164          */
3165         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3166         TICK_ALLOC_UPD_PAP(words+1,0);
3167         
3168         /* now build o = FUN(catch,ap,handler) */
3169         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3170         TICK_ALLOC_FUN(2,0);
3171         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3172         o->payload[0] = (StgClosure *)ap;
3173         o->payload[1] = cf->handler;
3174         
3175         IF_DEBUG(scheduler,
3176                  fprintf(stderr,  "scheduler: Built ");
3177                  printObj((StgClosure *)o);
3178                  );
3179         
3180         /* pop the old handler and put o on the stack */
3181         su = cf->link;
3182         sp += sizeofW(StgCatchFrame) - 1;
3183         sp[0] = (W_)o;
3184         break;
3185       }
3186       
3187     case SEQ_FRAME:
3188       {
3189         StgSeqFrame *sf = (StgSeqFrame *)su;
3190         StgClosure* o;
3191         
3192         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3193         TICK_ALLOC_UPD_PAP(words+1,0);
3194         
3195         /* now build o = FUN(seq,ap) */
3196         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3197         TICK_ALLOC_SE_THK(1,0);
3198         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3199         o->payload[0] = (StgClosure *)ap;
3200         
3201         IF_DEBUG(scheduler,
3202                  fprintf(stderr,  "scheduler: Built ");
3203                  printObj((StgClosure *)o);
3204                  );
3205         
3206         /* pop the old handler and put o on the stack */
3207         su = sf->link;
3208         sp += sizeofW(StgSeqFrame) - 1;
3209         sp[0] = (W_)o;
3210         break;
3211       }
3212       
3213     case STOP_FRAME:
3214       /* We've stripped the entire stack, the thread is now dead. */
3215       sp += sizeofW(StgStopFrame) - 1;
3216       sp[0] = (W_)exception;    /* save the exception */
3217       tso->what_next = ThreadKilled;
3218       tso->su = (StgUpdateFrame *)(sp+1);
3219       tso->sp = sp;
3220       return;
3221
3222     default:
3223       barf("raiseAsync");
3224     }
3225   }
3226   barf("raiseAsync");
3227 }
3228
3229 /* -----------------------------------------------------------------------------
3230    resurrectThreads is called after garbage collection on the list of
3231    threads found to be garbage.  Each of these threads will be woken
3232    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3233    on an MVar, or NonTermination if the thread was blocked on a Black
3234    Hole.
3235    -------------------------------------------------------------------------- */
3236
3237 void
3238 resurrectThreads( StgTSO *threads )
3239 {
3240   StgTSO *tso, *next;
3241
3242   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3243     next = tso->global_link;
3244     tso->global_link = all_threads;
3245     all_threads = tso;
3246     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3247
3248     switch (tso->why_blocked) {
3249     case BlockedOnMVar:
3250     case BlockedOnException:
3251       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3252       break;
3253     case BlockedOnBlackHole:
3254       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3255       break;
3256     case NotBlocked:
3257       /* This might happen if the thread was blocked on a black hole
3258        * belonging to a thread that we've just woken up (raiseAsync
3259        * can wake up threads, remember...).
3260        */
3261       continue;
3262     default:
3263       barf("resurrectThreads: thread blocked in a strange way");
3264     }
3265   }
3266 }
3267
3268 /* -----------------------------------------------------------------------------
3269  * Blackhole detection: if we reach a deadlock, test whether any
3270  * threads are blocked on themselves.  Any threads which are found to
3271  * be self-blocked get sent a NonTermination exception.
3272  *
3273  * This is only done in a deadlock situation in order to avoid
3274  * performance overhead in the normal case.
3275  * -------------------------------------------------------------------------- */
3276
3277 static void
3278 detectBlackHoles( void )
3279 {
3280     StgTSO *t = all_threads;
3281     StgUpdateFrame *frame;
3282     StgClosure *blocked_on;
3283
3284     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3285
3286         while (t->what_next == ThreadRelocated) {
3287             t = t->link;
3288             ASSERT(get_itbl(t)->type == TSO);
3289         }
3290       
3291         if (t->why_blocked != BlockedOnBlackHole) {
3292             continue;
3293         }
3294
3295         blocked_on = t->block_info.closure;
3296
3297         for (frame = t->su; ; frame = frame->link) {
3298             switch (get_itbl(frame)->type) {
3299
3300             case UPDATE_FRAME:
3301                 if (frame->updatee == blocked_on) {
3302                     /* We are blocking on one of our own computations, so
3303                      * send this thread the NonTermination exception.  
3304                      */
3305                     IF_DEBUG(scheduler, 
3306                              sched_belch("thread %d is blocked on itself", t->id));
3307                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3308                     goto done;
3309                 }
3310                 else {
3311                     continue;
3312                 }
3313
3314             case CATCH_FRAME:
3315             case SEQ_FRAME:
3316                 continue;
3317                 
3318             case STOP_FRAME:
3319                 break;
3320             }
3321             break;
3322         }
3323
3324     done: ;
3325     }   
3326 }
3327
3328 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3329 //@subsection Debugging Routines
3330
3331 /* -----------------------------------------------------------------------------
3332    Debugging: why is a thread blocked
3333    -------------------------------------------------------------------------- */
3334
3335 #ifdef DEBUG
3336
3337 void
3338 printThreadBlockage(StgTSO *tso)
3339 {
3340   switch (tso->why_blocked) {
3341   case BlockedOnRead:
3342     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3343     break;
3344   case BlockedOnWrite:
3345     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3346     break;
3347   case BlockedOnDelay:
3348     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3349     break;
3350   case BlockedOnMVar:
3351     fprintf(stderr,"is blocked on an MVar");
3352     break;
3353   case BlockedOnException:
3354     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3355             tso->block_info.tso->id);
3356     break;
3357   case BlockedOnBlackHole:
3358     fprintf(stderr,"is blocked on a black hole");
3359     break;
3360   case NotBlocked:
3361     fprintf(stderr,"is not blocked");
3362     break;
3363 #if defined(PAR)
3364   case BlockedOnGA:
3365     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3366             tso->block_info.closure, info_type(tso->block_info.closure));
3367     break;
3368   case BlockedOnGA_NoSend:
3369     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3370             tso->block_info.closure, info_type(tso->block_info.closure));
3371     break;
3372 #endif
3373 #if defined(RTS_SUPPORTS_THREADS)
3374   case BlockedOnCCall:
3375     fprintf(stderr,"is blocked on an external call");
3376     break;
3377 #endif
3378   default:
3379     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3380          tso->why_blocked, tso->id, tso);
3381   }
3382 }
3383
3384 void
3385 printThreadStatus(StgTSO *tso)
3386 {
3387   switch (tso->what_next) {
3388   case ThreadKilled:
3389     fprintf(stderr,"has been killed");
3390     break;
3391   case ThreadComplete:
3392     fprintf(stderr,"has completed");
3393     break;
3394   default:
3395     printThreadBlockage(tso);
3396   }
3397 }
3398
3399 void
3400 printAllThreads(void)
3401 {
3402   StgTSO *t;
3403
3404 # if defined(GRAN)
3405   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3406   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3407                        time_string, rtsFalse/*no commas!*/);
3408
3409   sched_belch("all threads at [%s]:", time_string);
3410 # elif defined(PAR)
3411   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3412   ullong_format_string(CURRENT_TIME,
3413                        time_string, rtsFalse/*no commas!*/);
3414
3415   sched_belch("all threads at [%s]:", time_string);
3416 # else
3417   sched_belch("all threads:");
3418 # endif
3419
3420   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3421     fprintf(stderr, "\tthread %d ", t->id);
3422     printThreadStatus(t);
3423     fprintf(stderr,"\n");
3424   }
3425 }
3426     
3427 /* 
3428    Print a whole blocking queue attached to node (debugging only).
3429 */
3430 //@cindex print_bq
3431 # if defined(PAR)
3432 void 
3433 print_bq (StgClosure *node)
3434 {
3435   StgBlockingQueueElement *bqe;
3436   StgTSO *tso;
3437   rtsBool end;
3438
3439   fprintf(stderr,"## BQ of closure %p (%s): ",
3440           node, info_type(node));
3441
3442   /* should cover all closures that may have a blocking queue */
3443   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3444          get_itbl(node)->type == FETCH_ME_BQ ||
3445          get_itbl(node)->type == RBH ||
3446          get_itbl(node)->type == MVAR);
3447     
3448   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3449
3450   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3451 }
3452
3453 /* 
3454    Print a whole blocking queue starting with the element bqe.
3455 */
3456 void 
3457 print_bqe (StgBlockingQueueElement *bqe)
3458 {
3459   rtsBool end;
3460
3461   /* 
3462      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3463   */
3464   for (end = (bqe==END_BQ_QUEUE);
3465        !end; // iterate until bqe points to a CONSTR
3466        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3467        bqe = end ? END_BQ_QUEUE : bqe->link) {
3468     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3469     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3470     /* types of closures that may appear in a blocking queue */
3471     ASSERT(get_itbl(bqe)->type == TSO ||           
3472            get_itbl(bqe)->type == BLOCKED_FETCH || 
3473            get_itbl(bqe)->type == CONSTR); 
3474     /* only BQs of an RBH end with an RBH_Save closure */
3475     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3476
3477     switch (get_itbl(bqe)->type) {
3478     case TSO:
3479       fprintf(stderr," TSO %u (%x),",
3480               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3481       break;
3482     case BLOCKED_FETCH:
3483       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3484               ((StgBlockedFetch *)bqe)->node, 
3485               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3486               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3487               ((StgBlockedFetch *)bqe)->ga.weight);
3488       break;
3489     case CONSTR:
3490       fprintf(stderr," %s (IP %p),",
3491               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3492                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3493                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3494                "RBH_Save_?"), get_itbl(bqe));
3495       break;
3496     default:
3497       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3498            info_type((StgClosure *)bqe)); // , node, info_type(node));
3499       break;
3500     }
3501   } /* for */
3502   fputc('\n', stderr);
3503 }
3504 # elif defined(GRAN)
3505 void 
3506 print_bq (StgClosure *node)
3507 {
3508   StgBlockingQueueElement *bqe;
3509   PEs node_loc, tso_loc;
3510   rtsBool end;
3511
3512   /* should cover all closures that may have a blocking queue */
3513   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3514          get_itbl(node)->type == FETCH_ME_BQ ||
3515          get_itbl(node)->type == RBH);
3516     
3517   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3518   node_loc = where_is(node);
3519
3520   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3521           node, info_type(node), node_loc);
3522
3523   /* 
3524      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3525   */
3526   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3527        !end; // iterate until bqe points to a CONSTR
3528        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3529     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3530     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3531     /* types of closures that may appear in a blocking queue */
3532     ASSERT(get_itbl(bqe)->type == TSO ||           
3533            get_itbl(bqe)->type == CONSTR); 
3534     /* only BQs of an RBH end with an RBH_Save closure */
3535     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3536
3537     tso_loc = where_is((StgClosure *)bqe);
3538     switch (get_itbl(bqe)->type) {
3539     case TSO:
3540       fprintf(stderr," TSO %d (%p) on [PE %d],",
3541               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3542       break;
3543     case CONSTR:
3544       fprintf(stderr," %s (IP %p),",
3545               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3546                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3547                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3548                "RBH_Save_?"), get_itbl(bqe));
3549       break;
3550     default:
3551       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3552            info_type((StgClosure *)bqe), node, info_type(node));
3553       break;
3554     }
3555   } /* for */
3556   fputc('\n', stderr);
3557 }
3558 #else
3559 /* 
3560    Nice and easy: only TSOs on the blocking queue
3561 */
3562 void 
3563 print_bq (StgClosure *node)
3564 {
3565   StgTSO *tso;
3566
3567   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3568   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3569        tso != END_TSO_QUEUE; 
3570        tso=tso->link) {
3571     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3572     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3573     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3574   }
3575   fputc('\n', stderr);
3576 }
3577 # endif
3578
3579 #if defined(PAR)
3580 static nat
3581 run_queue_len(void)
3582 {
3583   nat i;
3584   StgTSO *tso;
3585
3586   for (i=0, tso=run_queue_hd; 
3587        tso != END_TSO_QUEUE;
3588        i++, tso=tso->link)
3589     /* nothing */
3590
3591   return i;
3592 }
3593 #endif
3594
3595 static void
3596 sched_belch(char *s, ...)
3597 {
3598   va_list ap;
3599   va_start(ap,s);
3600 #ifdef SMP
3601   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3602 #elif defined(PAR)
3603   fprintf(stderr, "== ");
3604 #else
3605   fprintf(stderr, "scheduler: ");
3606 #endif
3607   vfprintf(stderr, s, ap);
3608   fprintf(stderr, "\n");
3609 }
3610
3611 #endif /* DEBUG */
3612
3613
3614 //@node Index,  , Debugging Routines, Main scheduling code
3615 //@subsection Index
3616
3617 //@index
3618 //* StgMainThread::  @cindex\s-+StgMainThread
3619 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3620 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3621 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3622 //* context_switch::  @cindex\s-+context_switch
3623 //* createThread::  @cindex\s-+createThread
3624 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3625 //* initScheduler::  @cindex\s-+initScheduler
3626 //* interrupted::  @cindex\s-+interrupted
3627 //* next_thread_id::  @cindex\s-+next_thread_id
3628 //* print_bq::  @cindex\s-+print_bq
3629 //* run_queue_hd::  @cindex\s-+run_queue_hd
3630 //* run_queue_tl::  @cindex\s-+run_queue_tl
3631 //* sched_mutex::  @cindex\s-+sched_mutex
3632 //* schedule::  @cindex\s-+schedule
3633 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3634 //* term_mutex::  @cindex\s-+term_mutex
3635 //@end index