[project @ 2002-02-12 15:38:08 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.121 2002/02/12 15:38:08 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #include <stdarg.h>
118
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
121
122 /* Main threads:
123  *
124  * These are the threads which clients have requested that we run.  
125  *
126  * In a 'threaded' build, we might have several concurrent clients all
127  * waiting for results, and each one will wait on a condition variable
128  * until the result is available.
129  *
130  * In non-SMP, clients are strictly nested: the first client calls
131  * into the RTS, which might call out again to C with a _ccall_GC, and
132  * eventually re-enter the RTS.
133  *
134  * Main threads information is kept in a linked list:
135  */
136 //@cindex StgMainThread
137 typedef struct StgMainThread_ {
138   StgTSO *         tso;
139   SchedulerStatus  stat;
140   StgClosure **    ret;
141 #if defined(RTS_SUPPORTS_THREADS)
142   Condition        wakeup;
143 #endif
144   struct StgMainThread_ *link;
145 } StgMainThread;
146
147 /* Main thread queue.
148  * Locks required: sched_mutex.
149  */
150 static StgMainThread *main_threads;
151
152 /* Thread queues.
153  * Locks required: sched_mutex.
154  */
155 #if defined(GRAN)
156
157 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
159
160 /* 
161    In GranSim we have a runable and a blocked queue for each processor.
162    In order to minimise code changes new arrays run_queue_hds/tls
163    are created. run_queue_hd is then a short cut (macro) for
164    run_queue_hds[CurrentProc] (see GranSim.h).
165    -- HWL
166 */
167 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
168 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
169 StgTSO *ccalling_threadss[MAX_PROC];
170 /* We use the same global list of threads (all_threads) in GranSim as in
171    the std RTS (i.e. we are cheating). However, we don't use this list in
172    the GranSim specific code at the moment (so we are only potentially
173    cheating).  */
174
175 #else /* !GRAN */
176
177 StgTSO *run_queue_hd, *run_queue_tl;
178 StgTSO *blocked_queue_hd, *blocked_queue_tl;
179 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
180
181 #endif
182
183 /* Linked list of all threads.
184  * Used for detecting garbage collected threads.
185  */
186 StgTSO *all_threads;
187
188 /* When a thread performs a safe C call (_ccall_GC, using old
189  * terminology), it gets put on the suspended_ccalling_threads
190  * list. Used by the garbage collector.
191  */
192 static StgTSO *suspended_ccalling_threads;
193
194 static StgTSO *threadStackOverflow(StgTSO *tso);
195
196 /* KH: The following two flags are shared memory locations.  There is no need
197        to lock them, since they are only unset at the end of a scheduler
198        operation.
199 */
200
201 /* flag set by signal handler to precipitate a context switch */
202 //@cindex context_switch
203 nat context_switch;
204
205 /* if this flag is set as well, give up execution */
206 //@cindex interrupted
207 rtsBool interrupted;
208
209 /* Next thread ID to allocate.
210  * Locks required: sched_mutex
211  */
212 //@cindex next_thread_id
213 StgThreadID next_thread_id = 1;
214
215 /*
216  * Pointers to the state of the current thread.
217  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
218  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
219  */
220  
221 /* The smallest stack size that makes any sense is:
222  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
223  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
224  *  + 1                       (the realworld token for an IO thread)
225  *  + 1                       (the closure to enter)
226  *
227  * A thread with this stack will bomb immediately with a stack
228  * overflow, which will increase its stack size.  
229  */
230
231 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
232
233
234 #if defined(GRAN)
235 StgTSO *CurrentTSO;
236 #endif
237
238 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
239  *  exists - earlier gccs apparently didn't.
240  *  -= chak
241  */
242 StgTSO dummy_tso;
243
244 rtsBool ready_to_gc;
245
246 void            addToBlockedQueue ( StgTSO *tso );
247
248 static void     schedule          ( void );
249        void     interruptStgRts   ( void );
250 #if defined(GRAN)
251 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
252 #else
253 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
254 #endif
255
256 static void     detectBlackHoles  ( void );
257
258 #ifdef DEBUG
259 static void sched_belch(char *s, ...);
260 #endif
261
262 #if defined(RTS_SUPPORTS_THREADS)
263 /* ToDo: carefully document the invariants that go together
264  *       with these synchronisation objects.
265  */
266 Mutex     sched_mutex       = INIT_MUTEX_VAR;
267 Mutex     term_mutex        = INIT_MUTEX_VAR;
268 #if defined(THREADED_RTS)
269 /*
270  * The rts_mutex is the 'big lock' that the active native
271  * thread within the RTS holds while executing code.
272  * It is given up when the thread makes a transition out of
273  * the RTS (e.g., to perform an external C call), hopefully
274  * for another thread to take over its chores and enter
275  * the RTS.
276  *
277  */
278 Mutex     rts_mutex         = INIT_MUTEX_VAR;
279 /*
280  * When a native thread has completed executing an external
281  * call, it needs to communicate the result back to the
282  * (Haskell) thread that made the call. Do this as follows:
283  *
284  *  - in resumeThread(), the thread increments the counter
285  *    threads_waiting, and then blocks on the 'big' RTS lock.
286  *  - upon entry to the scheduler, the thread that's currently
287  *    holding the RTS lock checks threads_waiting. If there
288  *    are native threads waiting, it gives up its RTS lock
289  *    and tries to re-grab the RTS lock [perhaps after having
290  *    waited for a bit..?]
291  *  - care must be taken to deal with the case where more than
292  *    one external thread are waiting on the lock. [ToDo: more]
293  *    
294  */
295
296 static nat threads_waiting = 0;
297 #endif
298
299
300 /* thread_ready_cond: when signalled, a thread has become runnable for a
301  * task to execute.
302  *
303  * In the non-SMP case, it also implies that the thread that is woken up has
304  * exclusive access to the RTS and all its DS (that are not under sched_mutex's
305  * control).
306  *
307  * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold.
308  *
309  */
310 Condition thread_ready_cond = INIT_COND_VAR;
311 #if 0
312 /* For documentation purposes only */
313 #define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE())
314 #endif
315
316 #if defined(SMP)
317 Condition gc_pending_cond   = INIT_COND_VAR;
318 nat await_death;
319 #endif
320
321 #endif
322
323 #if defined(PAR)
324 StgTSO *LastTSO;
325 rtsTime TimeOfLastYield;
326 rtsBool emitSchedule = rtsTrue;
327 #endif
328
329 #if DEBUG
330 char *whatNext_strs[] = {
331   "ThreadEnterGHC",
332   "ThreadRunGHC",
333   "ThreadEnterInterp",
334   "ThreadKilled",
335   "ThreadComplete"
336 };
337
338 char *threadReturnCode_strs[] = {
339   "HeapOverflow",                       /* might also be StackOverflow */
340   "StackOverflow",
341   "ThreadYielding",
342   "ThreadBlocked",
343   "ThreadFinished"
344 };
345 #endif
346
347 #if defined(PAR)
348 StgTSO * createSparkThread(rtsSpark spark);
349 StgTSO * activateSpark (rtsSpark spark);  
350 #endif
351
352 /*
353  * The thread state for the main thread.
354 // ToDo: check whether not needed any more
355 StgTSO   *MainTSO;
356  */
357
358 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
359 static void taskStart(void);
360 static void
361 taskStart(void)
362 {
363   /* threads start up using 'taskStart', so make them
364      them grab the RTS lock. */
365 #if defined(THREADED_RTS)
366   ACQUIRE_LOCK(&rts_mutex);
367   taskNotAvailable();
368 #endif
369   schedule();
370 }
371 #endif
372
373
374
375
376 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
377 //@subsection Main scheduling loop
378
379 /* ---------------------------------------------------------------------------
380    Main scheduling loop.
381
382    We use round-robin scheduling, each thread returning to the
383    scheduler loop when one of these conditions is detected:
384
385       * out of heap space
386       * timer expires (thread yields)
387       * thread blocks
388       * thread ends
389       * stack overflow
390
391    Locking notes:  we acquire the scheduler lock once at the beginning
392    of the scheduler loop, and release it when
393     
394       * running a thread, or
395       * waiting for work, or
396       * waiting for a GC to complete.
397
398    GRAN version:
399      In a GranSim setup this loop iterates over the global event queue.
400      This revolves around the global event queue, which determines what 
401      to do next. Therefore, it's more complicated than either the 
402      concurrent or the parallel (GUM) setup.
403
404    GUM version:
405      GUM iterates over incoming messages.
406      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
407      and sends out a fish whenever it has nothing to do; in-between
408      doing the actual reductions (shared code below) it processes the
409      incoming messages and deals with delayed operations 
410      (see PendingFetches).
411      This is not the ugliest code you could imagine, but it's bloody close.
412
413    ------------------------------------------------------------------------ */
414 //@cindex schedule
415 static void
416 schedule( void )
417 {
418   StgTSO *t;
419   Capability *cap;
420   StgThreadReturnCode ret;
421 #if defined(GRAN)
422   rtsEvent *event;
423 #elif defined(PAR)
424   StgSparkPool *pool;
425   rtsSpark spark;
426   StgTSO *tso;
427   GlobalTaskId pe;
428   rtsBool receivedFinish = rtsFalse;
429 # if defined(DEBUG)
430   nat tp_size, sp_size; // stats only
431 # endif
432 #endif
433   rtsBool was_interrupted = rtsFalse;
434   
435   ACQUIRE_LOCK(&sched_mutex);
436   
437 #if defined(THREADED_RTS)
438   /* ToDo: consider SMP support */
439   if (threads_waiting > 0) {
440     /* (At least) one native thread is waiting to
441      * deposit the result of an external call. So,
442      * give up our RTS executing privileges and let
443      * one of them continue.
444      * 
445      */
446     taskAvailable();
447     RELEASE_LOCK(&sched_mutex);
448     IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (threads_waiting=%d)\n", osThreadId(), threads_waiting));
449     RELEASE_LOCK(&rts_mutex);
450     /* ToDo: come up with mechanism that guarantees that
451      * the main thread doesn't loop here.
452      */
453     yieldThread();
454     /* ToDo: longjmp() */
455     taskStart();
456   }
457 #endif
458
459 #if defined(GRAN)
460
461   /* set up first event to get things going */
462   /* ToDo: assign costs for system setup and init MainTSO ! */
463   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
464             ContinueThread, 
465             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
466
467   IF_DEBUG(gran,
468            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
469            G_TSO(CurrentTSO, 5));
470
471   if (RtsFlags.GranFlags.Light) {
472     /* Save current time; GranSim Light only */
473     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
474   }      
475
476   event = get_next_event();
477
478   while (event!=(rtsEvent*)NULL) {
479     /* Choose the processor with the next event */
480     CurrentProc = event->proc;
481     CurrentTSO = event->tso;
482
483 #elif defined(PAR)
484
485   while (!receivedFinish) {    /* set by processMessages */
486                                /* when receiving PP_FINISH message         */ 
487 #else
488
489   while (1) {
490
491 #endif
492
493     IF_DEBUG(scheduler, printAllThreads());
494
495     /* If we're interrupted (the user pressed ^C, or some other
496      * termination condition occurred), kill all the currently running
497      * threads.
498      */
499     if (interrupted) {
500       IF_DEBUG(scheduler, sched_belch("interrupted"));
501       deleteAllThreads();
502       interrupted = rtsFalse;
503       was_interrupted = rtsTrue;
504     }
505
506     /* Go through the list of main threads and wake up any
507      * clients whose computations have finished.  ToDo: this
508      * should be done more efficiently without a linear scan
509      * of the main threads list, somehow...
510      */
511 #if defined(RTS_SUPPORTS_THREADS)
512     { 
513       StgMainThread *m, **prev;
514       prev = &main_threads;
515       for (m = main_threads; m != NULL; m = m->link) {
516         switch (m->tso->what_next) {
517         case ThreadComplete:
518           if (m->ret) {
519             *(m->ret) = (StgClosure *)m->tso->sp[0];
520           }
521           *prev = m->link;
522           m->stat = Success;
523           broadcastCondition(&m->wakeup);
524           break;
525         case ThreadKilled:
526           if (m->ret) *(m->ret) = NULL;
527           *prev = m->link;
528           if (was_interrupted) {
529             m->stat = Interrupted;
530           } else {
531             m->stat = Killed;
532           }
533           broadcastCondition(&m->wakeup);
534           break;
535         default:
536           break;
537         }
538       }
539     }
540
541 #else /* not threaded */
542
543 # if defined(PAR)
544     /* in GUM do this only on the Main PE */
545     if (IAmMainThread)
546 # endif
547     /* If our main thread has finished or been killed, return.
548      */
549     {
550       StgMainThread *m = main_threads;
551       if (m->tso->what_next == ThreadComplete
552           || m->tso->what_next == ThreadKilled) {
553         main_threads = main_threads->link;
554         if (m->tso->what_next == ThreadComplete) {
555           /* we finished successfully, fill in the return value */
556           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
557           m->stat = Success;
558           return;
559         } else {
560           if (m->ret) { *(m->ret) = NULL; };
561           if (was_interrupted) {
562             m->stat = Interrupted;
563           } else {
564             m->stat = Killed;
565           }
566           return;
567         }
568       }
569     }
570 #endif
571
572     /* Top up the run queue from our spark pool.  We try to make the
573      * number of threads in the run queue equal to the number of
574      * free capabilities.
575      *
576      * Disable spark support in SMP for now, non-essential & requires
577      * a little bit of work to make it compile cleanly. -- sof 1/02.
578      */
579 #if 0 /* defined(SMP) */
580     {
581       nat n = getFreeCapabilities();
582       StgTSO *tso = run_queue_hd;
583
584       /* Count the run queue */
585       while (n > 0 && tso != END_TSO_QUEUE) {
586         tso = tso->link;
587         n--;
588       }
589
590       for (; n > 0; n--) {
591         StgClosure *spark;
592         spark = findSpark(rtsFalse);
593         if (spark == NULL) {
594           break; /* no more sparks in the pool */
595         } else {
596           /* I'd prefer this to be done in activateSpark -- HWL */
597           /* tricky - it needs to hold the scheduler lock and
598            * not try to re-acquire it -- SDM */
599           createSparkThread(spark);       
600           IF_DEBUG(scheduler,
601                    sched_belch("==^^ turning spark of closure %p into a thread",
602                                (StgClosure *)spark));
603         }
604       }
605       /* We need to wake up the other tasks if we just created some
606        * work for them.
607        */
608       if (getFreeCapabilities() - n > 1) {
609           signalCondition( &thread_ready_cond );
610       }
611     }
612 #endif // SMP
613
614     /* check for signals each time around the scheduler */
615 #ifndef mingw32_TARGET_OS
616     if (signals_pending()) {
617       startSignalHandlers();
618     }
619 #endif
620
621     /* Check whether any waiting threads need to be woken up.  If the
622      * run queue is empty, and there are no other tasks running, we
623      * can wait indefinitely for something to happen.
624      * ToDo: what if another client comes along & requests another
625      * main thread?
626      */
627     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
628       awaitEvent( EMPTY_RUN_QUEUE()
629 #if defined(SMP)
630         && allFreeCapabilities()
631 #endif
632         );
633     }
634     /* we can be interrupted while waiting for I/O... */
635     if (interrupted) continue;
636
637     /* 
638      * Detect deadlock: when we have no threads to run, there are no
639      * threads waiting on I/O or sleeping, and all the other tasks are
640      * waiting for work, we must have a deadlock of some description.
641      *
642      * We first try to find threads blocked on themselves (ie. black
643      * holes), and generate NonTermination exceptions where necessary.
644      *
645      * If no threads are black holed, we have a deadlock situation, so
646      * inform all the main threads.
647      */
648 #ifndef PAR
649     if (   EMPTY_QUEUE(blocked_queue_hd)
650         && EMPTY_RUN_QUEUE()
651         && EMPTY_QUEUE(sleeping_queue)
652 #if defined(SMP)
653         && allFreeCapabilities()
654 #elif defined(THREADED_RTS)
655         && EMPTY_QUEUE(suspended_ccalling_threads)
656 #endif
657         )
658     {
659         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
660         RELEASE_LOCK(&sched_mutex);
661         GarbageCollect(GetRoots,rtsTrue);
662         ACQUIRE_LOCK(&sched_mutex);
663         IF_DEBUG(scheduler, sched_belch("GC done."));
664         if (   EMPTY_QUEUE(blocked_queue_hd)
665             && EMPTY_RUN_QUEUE()
666             && EMPTY_QUEUE(sleeping_queue) ) {
667
668             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
669             detectBlackHoles();
670
671             /* No black holes, so probably a real deadlock.  Send the
672              * current main thread the Deadlock exception (or in the SMP
673              * build, send *all* main threads the deadlock exception,
674              * since none of them can make progress).
675              */
676             if ( EMPTY_RUN_QUEUE() ) {
677                 StgMainThread *m;
678 #if defined(RTS_SUPPORTS_THREADS)
679                 for (m = main_threads; m != NULL; m = m->link) {
680                     switch (m->tso->why_blocked) {
681                     case BlockedOnBlackHole:
682                         raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
683                         break;
684                     case BlockedOnException:
685                     case BlockedOnMVar:
686                         raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
687                         break;
688                     default:
689                         barf("deadlock: main thread blocked in a strange way");
690                     }
691                 }
692 #else
693                 m = main_threads;
694                 switch (m->tso->why_blocked) {
695                 case BlockedOnBlackHole:
696                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
697                     break;
698                 case BlockedOnException:
699                 case BlockedOnMVar:
700                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
701                     break;
702                 default:
703                     barf("deadlock: main thread blocked in a strange way");
704                 }
705 #endif
706             }
707 #if defined(RTS_SUPPORTS_THREADS)
708             if ( EMPTY_RUN_QUEUE() ) {
709               IF_DEBUG(scheduler, sched_belch("all done, it seems...shut down."));
710               shutdownHaskellAndExit(0);
711             
712             }
713 #endif
714             ASSERT( !EMPTY_RUN_QUEUE() );
715         }
716     }
717 #elif defined(PAR)
718     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
719 #endif
720
721 #if defined(SMP)
722     /* If there's a GC pending, don't do anything until it has
723      * completed.
724      */
725     if (ready_to_gc) {
726       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
727       waitCondition( &gc_pending_cond, &sched_mutex );
728     }
729 #endif    
730
731 #if defined(SMP)
732     /* block until we've got a thread on the run queue and a free
733      * capability.
734      */
735     while ( noCapabilities() || EMPTY_RUN_QUEUE() ) {
736       IF_DEBUG(scheduler, sched_belch("waiting for work"));
737       waitCondition( &thread_ready_cond, &sched_mutex );
738       IF_DEBUG(scheduler, sched_belch("work now available"));
739     }
740 #elif defined(THREADED_RTS)
741    if ( EMPTY_RUN_QUEUErun_queue_hd == END_TSO_QUEUE ) {
742      /* no work available, wait for external calls to complete. */
743      IF_DEBUG(scheduler, sched_belch("worker thread (%d): waiting for external thread to complete..", osThreadId()));
744      taskAvailable();
745      RELEASE_LOCK(&rts_mutex);
746
747      while ( EMPTY_RUN_QUEUE() ) {
748        waitCondition(&thread_ready_cond, &sched_mutex);
749      };
750      RELEASE_LOCK(&sched_mutex);
751
752      IF_DEBUG(scheduler, sched_belch("worker thread (%d): re-awakened from no-work slumber..\n", osThreadId()));
753      /* ToDo: longjmp() */
754      taskStart();
755    }
756 #endif
757
758 #if defined(GRAN)
759
760     if (RtsFlags.GranFlags.Light)
761       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
762
763     /* adjust time based on time-stamp */
764     if (event->time > CurrentTime[CurrentProc] &&
765         event->evttype != ContinueThread)
766       CurrentTime[CurrentProc] = event->time;
767     
768     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
769     if (!RtsFlags.GranFlags.Light)
770       handleIdlePEs();
771
772     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
773
774     /* main event dispatcher in GranSim */
775     switch (event->evttype) {
776       /* Should just be continuing execution */
777     case ContinueThread:
778       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
779       /* ToDo: check assertion
780       ASSERT(run_queue_hd != (StgTSO*)NULL &&
781              run_queue_hd != END_TSO_QUEUE);
782       */
783       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
784       if (!RtsFlags.GranFlags.DoAsyncFetch &&
785           procStatus[CurrentProc]==Fetching) {
786         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
787               CurrentTSO->id, CurrentTSO, CurrentProc);
788         goto next_thread;
789       } 
790       /* Ignore ContinueThreads for completed threads */
791       if (CurrentTSO->what_next == ThreadComplete) {
792         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
793               CurrentTSO->id, CurrentTSO, CurrentProc);
794         goto next_thread;
795       } 
796       /* Ignore ContinueThreads for threads that are being migrated */
797       if (PROCS(CurrentTSO)==Nowhere) { 
798         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
799               CurrentTSO->id, CurrentTSO, CurrentProc);
800         goto next_thread;
801       }
802       /* The thread should be at the beginning of the run queue */
803       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
804         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
805               CurrentTSO->id, CurrentTSO, CurrentProc);
806         break; // run the thread anyway
807       }
808       /*
809       new_event(proc, proc, CurrentTime[proc],
810                 FindWork,
811                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
812       goto next_thread; 
813       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
814       break; // now actually run the thread; DaH Qu'vam yImuHbej 
815
816     case FetchNode:
817       do_the_fetchnode(event);
818       goto next_thread;             /* handle next event in event queue  */
819       
820     case GlobalBlock:
821       do_the_globalblock(event);
822       goto next_thread;             /* handle next event in event queue  */
823       
824     case FetchReply:
825       do_the_fetchreply(event);
826       goto next_thread;             /* handle next event in event queue  */
827       
828     case UnblockThread:   /* Move from the blocked queue to the tail of */
829       do_the_unblock(event);
830       goto next_thread;             /* handle next event in event queue  */
831       
832     case ResumeThread:  /* Move from the blocked queue to the tail of */
833       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
834       event->tso->gran.blocktime += 
835         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
836       do_the_startthread(event);
837       goto next_thread;             /* handle next event in event queue  */
838       
839     case StartThread:
840       do_the_startthread(event);
841       goto next_thread;             /* handle next event in event queue  */
842       
843     case MoveThread:
844       do_the_movethread(event);
845       goto next_thread;             /* handle next event in event queue  */
846       
847     case MoveSpark:
848       do_the_movespark(event);
849       goto next_thread;             /* handle next event in event queue  */
850       
851     case FindWork:
852       do_the_findwork(event);
853       goto next_thread;             /* handle next event in event queue  */
854       
855     default:
856       barf("Illegal event type %u\n", event->evttype);
857     }  /* switch */
858     
859     /* This point was scheduler_loop in the old RTS */
860
861     IF_DEBUG(gran, belch("GRAN: after main switch"));
862
863     TimeOfLastEvent = CurrentTime[CurrentProc];
864     TimeOfNextEvent = get_time_of_next_event();
865     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
866     // CurrentTSO = ThreadQueueHd;
867
868     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
869                          TimeOfNextEvent));
870
871     if (RtsFlags.GranFlags.Light) 
872       GranSimLight_leave_system(event, &ActiveTSO); 
873
874     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
875
876     IF_DEBUG(gran, 
877              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
878
879     /* in a GranSim setup the TSO stays on the run queue */
880     t = CurrentTSO;
881     /* Take a thread from the run queue. */
882     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
883
884     IF_DEBUG(gran, 
885              fprintf(stderr, "GRAN: About to run current thread, which is\n");
886              G_TSO(t,5));
887
888     context_switch = 0; // turned on via GranYield, checking events and time slice
889
890     IF_DEBUG(gran, 
891              DumpGranEvent(GR_SCHEDULE, t));
892
893     procStatus[CurrentProc] = Busy;
894
895 #elif defined(PAR)
896     if (PendingFetches != END_BF_QUEUE) {
897         processFetches();
898     }
899
900     /* ToDo: phps merge with spark activation above */
901     /* check whether we have local work and send requests if we have none */
902     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
903       /* :-[  no local threads => look out for local sparks */
904       /* the spark pool for the current PE */
905       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
906       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
907           pool->hd < pool->tl) {
908         /* 
909          * ToDo: add GC code check that we really have enough heap afterwards!!
910          * Old comment:
911          * If we're here (no runnable threads) and we have pending
912          * sparks, we must have a space problem.  Get enough space
913          * to turn one of those pending sparks into a
914          * thread... 
915          */
916
917         spark = findSpark(rtsFalse);                /* get a spark */
918         if (spark != (rtsSpark) NULL) {
919           tso = activateSpark(spark);       /* turn the spark into a thread */
920           IF_PAR_DEBUG(schedule,
921                        belch("==== schedule: Created TSO %d (%p); %d threads active",
922                              tso->id, tso, advisory_thread_count));
923
924           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
925             belch("==^^ failed to activate spark");
926             goto next_thread;
927           }               /* otherwise fall through & pick-up new tso */
928         } else {
929           IF_PAR_DEBUG(verbose,
930                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
931                              spark_queue_len(pool)));
932           goto next_thread;
933         }
934       }
935
936       /* If we still have no work we need to send a FISH to get a spark
937          from another PE 
938       */
939       if (EMPTY_RUN_QUEUE()) {
940       /* =8-[  no local sparks => look for work on other PEs */
941         /*
942          * We really have absolutely no work.  Send out a fish
943          * (there may be some out there already), and wait for
944          * something to arrive.  We clearly can't run any threads
945          * until a SCHEDULE or RESUME arrives, and so that's what
946          * we're hoping to see.  (Of course, we still have to
947          * respond to other types of messages.)
948          */
949         TIME now = msTime() /*CURRENT_TIME*/;
950         IF_PAR_DEBUG(verbose, 
951                      belch("--  now=%ld", now));
952         IF_PAR_DEBUG(verbose,
953                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
954                          (last_fish_arrived_at!=0 &&
955                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
956                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
957                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
958                              last_fish_arrived_at,
959                              RtsFlags.ParFlags.fishDelay, now);
960                      });
961         
962         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
963             (last_fish_arrived_at==0 ||
964              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
965           /* outstandingFishes is set in sendFish, processFish;
966              avoid flooding system with fishes via delay */
967           pe = choosePE();
968           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
969                    NEW_FISH_HUNGER);
970
971           // Global statistics: count no. of fishes
972           if (RtsFlags.ParFlags.ParStats.Global &&
973               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
974             globalParStats.tot_fish_mess++;
975           }
976         }
977       
978         receivedFinish = processMessages();
979         goto next_thread;
980       }
981     } else if (PacketsWaiting()) {  /* Look for incoming messages */
982       receivedFinish = processMessages();
983     }
984
985     /* Now we are sure that we have some work available */
986     ASSERT(run_queue_hd != END_TSO_QUEUE);
987
988     /* Take a thread from the run queue, if we have work */
989     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
990     IF_DEBUG(sanity,checkTSO(t));
991
992     /* ToDo: write something to the log-file
993     if (RTSflags.ParFlags.granSimStats && !sameThread)
994         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
995
996     CurrentTSO = t;
997     */
998     /* the spark pool for the current PE */
999     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1000
1001     IF_DEBUG(scheduler, 
1002              belch("--=^ %d threads, %d sparks on [%#x]", 
1003                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1004
1005 #if 1
1006     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1007         t && LastTSO && t->id != LastTSO->id && 
1008         LastTSO->why_blocked == NotBlocked && 
1009         LastTSO->what_next != ThreadComplete) {
1010       // if previously scheduled TSO not blocked we have to record the context switch
1011       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1012                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1013     }
1014
1015     if (RtsFlags.ParFlags.ParStats.Full && 
1016         (emitSchedule /* forced emit */ ||
1017         (t && LastTSO && t->id != LastTSO->id))) {
1018       /* 
1019          we are running a different TSO, so write a schedule event to log file
1020          NB: If we use fair scheduling we also have to write  a deschedule 
1021              event for LastTSO; with unfair scheduling we know that the
1022              previous tso has blocked whenever we switch to another tso, so
1023              we don't need it in GUM for now
1024       */
1025       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1026                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1027       emitSchedule = rtsFalse;
1028     }
1029      
1030 #endif
1031 #else /* !GRAN && !PAR */
1032   
1033     /* grab a thread from the run queue
1034      */
1035     ASSERT(run_queue_hd != END_TSO_QUEUE);
1036     t = POP_RUN_QUEUE();
1037     // Sanity check the thread we're about to run.  This can be
1038     // expensive if there is lots of thread switching going on...
1039     IF_DEBUG(sanity,checkTSO(t));
1040 #endif
1041     
1042     grabCapability(&cap);
1043     cap->r.rCurrentTSO = t;
1044     
1045     /* context switches are now initiated by the timer signal, unless
1046      * the user specified "context switch as often as possible", with
1047      * +RTS -C0
1048      */
1049     if (
1050 #ifdef PROFILING
1051         RtsFlags.ProfFlags.profileInterval == 0 ||
1052 #endif
1053         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1054          && (run_queue_hd != END_TSO_QUEUE
1055              || blocked_queue_hd != END_TSO_QUEUE
1056              || sleeping_queue != END_TSO_QUEUE)))
1057         context_switch = 1;
1058     else
1059         context_switch = 0;
1060
1061     RELEASE_LOCK(&sched_mutex);
1062
1063     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1064                               t->id, t, whatNext_strs[t->what_next]));
1065
1066 #ifdef PROFILING
1067     startHeapProfTimer();
1068 #endif
1069
1070     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1071     /* Run the current thread 
1072      */
1073     switch (cap->r.rCurrentTSO->what_next) {
1074     case ThreadKilled:
1075     case ThreadComplete:
1076         /* Thread already finished, return to scheduler. */
1077         ret = ThreadFinished;
1078         break;
1079     case ThreadEnterGHC:
1080         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1081         break;
1082     case ThreadRunGHC:
1083         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1084         break;
1085     case ThreadEnterInterp:
1086         ret = interpretBCO(cap);
1087         break;
1088     default:
1089       barf("schedule: invalid what_next field");
1090     }
1091     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1092     
1093     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1094 #ifdef PROFILING
1095     stopHeapProfTimer();
1096     CCCS = CCS_SYSTEM;
1097 #endif
1098     
1099     ACQUIRE_LOCK(&sched_mutex);
1100
1101 #ifdef SMP
1102     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1103 #elif !defined(GRAN) && !defined(PAR)
1104     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1105 #endif
1106     t = cap->r.rCurrentTSO;
1107     
1108 #if defined(PAR)
1109     /* HACK 675: if the last thread didn't yield, make sure to print a 
1110        SCHEDULE event to the log file when StgRunning the next thread, even
1111        if it is the same one as before */
1112     LastTSO = t; 
1113     TimeOfLastYield = CURRENT_TIME;
1114 #endif
1115
1116     switch (ret) {
1117     case HeapOverflow:
1118 #if defined(GRAN)
1119       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1120       globalGranStats.tot_heapover++;
1121 #elif defined(PAR)
1122       globalParStats.tot_heapover++;
1123 #endif
1124
1125       // did the task ask for a large block?
1126       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1127           // if so, get one and push it on the front of the nursery.
1128           bdescr *bd;
1129           nat blocks;
1130           
1131           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1132
1133           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1134                                    t->id, t,
1135                                    whatNext_strs[t->what_next], blocks));
1136
1137           // don't do this if it would push us over the
1138           // alloc_blocks_lim limit; we'll GC first.
1139           if (alloc_blocks + blocks < alloc_blocks_lim) {
1140
1141               alloc_blocks += blocks;
1142               bd = allocGroup( blocks );
1143
1144               // link the new group into the list
1145               bd->link = cap->r.rCurrentNursery;
1146               bd->u.back = cap->r.rCurrentNursery->u.back;
1147               if (cap->r.rCurrentNursery->u.back != NULL) {
1148                   cap->r.rCurrentNursery->u.back->link = bd;
1149               } else {
1150                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1151                          g0s0->blocks == cap->r.rNursery);
1152                   cap->r.rNursery = g0s0->blocks = bd;
1153               }           
1154               cap->r.rCurrentNursery->u.back = bd;
1155
1156               // initialise it as a nursery block
1157               bd->step = g0s0;
1158               bd->gen_no = 0;
1159               bd->flags = 0;
1160               bd->free = bd->start;
1161
1162               // don't forget to update the block count in g0s0.
1163               g0s0->n_blocks += blocks;
1164               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1165
1166               // now update the nursery to point to the new block
1167               cap->r.rCurrentNursery = bd;
1168
1169               // we might be unlucky and have another thread get on the
1170               // run queue before us and steal the large block, but in that
1171               // case the thread will just end up requesting another large
1172               // block.
1173               PUSH_ON_RUN_QUEUE(t);
1174               break;
1175           }
1176       }
1177
1178       /* make all the running tasks block on a condition variable,
1179        * maybe set context_switch and wait till they all pile in,
1180        * then have them wait on a GC condition variable.
1181        */
1182       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1183                                t->id, t, whatNext_strs[t->what_next]));
1184       threadPaused(t);
1185 #if defined(GRAN)
1186       ASSERT(!is_on_queue(t,CurrentProc));
1187 #elif defined(PAR)
1188       /* Currently we emit a DESCHEDULE event before GC in GUM.
1189          ToDo: either add separate event to distinguish SYSTEM time from rest
1190                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1191       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1192         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1193                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1194         emitSchedule = rtsTrue;
1195       }
1196 #endif
1197       
1198       ready_to_gc = rtsTrue;
1199       context_switch = 1;               /* stop other threads ASAP */
1200       PUSH_ON_RUN_QUEUE(t);
1201       /* actual GC is done at the end of the while loop */
1202       break;
1203       
1204     case StackOverflow:
1205 #if defined(GRAN)
1206       IF_DEBUG(gran, 
1207                DumpGranEvent(GR_DESCHEDULE, t));
1208       globalGranStats.tot_stackover++;
1209 #elif defined(PAR)
1210       // IF_DEBUG(par, 
1211       // DumpGranEvent(GR_DESCHEDULE, t);
1212       globalParStats.tot_stackover++;
1213 #endif
1214       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1215                                t->id, t, whatNext_strs[t->what_next]));
1216       /* just adjust the stack for this thread, then pop it back
1217        * on the run queue.
1218        */
1219       threadPaused(t);
1220       { 
1221         StgMainThread *m;
1222         /* enlarge the stack */
1223         StgTSO *new_t = threadStackOverflow(t);
1224         
1225         /* This TSO has moved, so update any pointers to it from the
1226          * main thread stack.  It better not be on any other queues...
1227          * (it shouldn't be).
1228          */
1229         for (m = main_threads; m != NULL; m = m->link) {
1230           if (m->tso == t) {
1231             m->tso = new_t;
1232           }
1233         }
1234         threadPaused(new_t);
1235         PUSH_ON_RUN_QUEUE(new_t);
1236       }
1237       break;
1238
1239     case ThreadYielding:
1240 #if defined(GRAN)
1241       IF_DEBUG(gran, 
1242                DumpGranEvent(GR_DESCHEDULE, t));
1243       globalGranStats.tot_yields++;
1244 #elif defined(PAR)
1245       // IF_DEBUG(par, 
1246       // DumpGranEvent(GR_DESCHEDULE, t);
1247       globalParStats.tot_yields++;
1248 #endif
1249       /* put the thread back on the run queue.  Then, if we're ready to
1250        * GC, check whether this is the last task to stop.  If so, wake
1251        * up the GC thread.  getThread will block during a GC until the
1252        * GC is finished.
1253        */
1254       IF_DEBUG(scheduler,
1255                if (t->what_next == ThreadEnterInterp) {
1256                    /* ToDo: or maybe a timer expired when we were in Hugs?
1257                     * or maybe someone hit ctrl-C
1258                     */
1259                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1260                          t->id, t, whatNext_strs[t->what_next]);
1261                } else {
1262                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1263                          t->id, t, whatNext_strs[t->what_next]);
1264                }
1265                );
1266
1267       threadPaused(t);
1268
1269       IF_DEBUG(sanity,
1270                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1271                checkTSO(t));
1272       ASSERT(t->link == END_TSO_QUEUE);
1273 #if defined(GRAN)
1274       ASSERT(!is_on_queue(t,CurrentProc));
1275
1276       IF_DEBUG(sanity,
1277                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1278                checkThreadQsSanity(rtsTrue));
1279 #endif
1280 #if defined(PAR)
1281       if (RtsFlags.ParFlags.doFairScheduling) { 
1282         /* this does round-robin scheduling; good for concurrency */
1283         APPEND_TO_RUN_QUEUE(t);
1284       } else {
1285         /* this does unfair scheduling; good for parallelism */
1286         PUSH_ON_RUN_QUEUE(t);
1287       }
1288 #else
1289       /* this does round-robin scheduling; good for concurrency */
1290       APPEND_TO_RUN_QUEUE(t);
1291 #endif
1292 #if defined(GRAN)
1293       /* add a ContinueThread event to actually process the thread */
1294       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1295                 ContinueThread,
1296                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1297       IF_GRAN_DEBUG(bq, 
1298                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1299                G_EVENTQ(0);
1300                G_CURR_THREADQ(0));
1301 #endif /* GRAN */
1302       break;
1303       
1304     case ThreadBlocked:
1305 #if defined(GRAN)
1306       IF_DEBUG(scheduler,
1307                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1308                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1309                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1310
1311       // ??? needed; should emit block before
1312       IF_DEBUG(gran, 
1313                DumpGranEvent(GR_DESCHEDULE, t)); 
1314       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1315       /*
1316         ngoq Dogh!
1317       ASSERT(procStatus[CurrentProc]==Busy || 
1318               ((procStatus[CurrentProc]==Fetching) && 
1319               (t->block_info.closure!=(StgClosure*)NULL)));
1320       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1321           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1322             procStatus[CurrentProc]==Fetching)) 
1323         procStatus[CurrentProc] = Idle;
1324       */
1325 #elif defined(PAR)
1326       IF_DEBUG(scheduler,
1327                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1328                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1329       IF_PAR_DEBUG(bq,
1330
1331                    if (t->block_info.closure!=(StgClosure*)NULL) 
1332                      print_bq(t->block_info.closure));
1333
1334       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1335       blockThread(t);
1336
1337       /* whatever we schedule next, we must log that schedule */
1338       emitSchedule = rtsTrue;
1339
1340 #else /* !GRAN */
1341       /* don't need to do anything.  Either the thread is blocked on
1342        * I/O, in which case we'll have called addToBlockedQueue
1343        * previously, or it's blocked on an MVar or Blackhole, in which
1344        * case it'll be on the relevant queue already.
1345        */
1346       IF_DEBUG(scheduler,
1347                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1348                printThreadBlockage(t);
1349                fprintf(stderr, "\n"));
1350
1351       /* Only for dumping event to log file 
1352          ToDo: do I need this in GranSim, too?
1353       blockThread(t);
1354       */
1355 #endif
1356       threadPaused(t);
1357       break;
1358       
1359     case ThreadFinished:
1360       /* Need to check whether this was a main thread, and if so, signal
1361        * the task that started it with the return value.  If we have no
1362        * more main threads, we probably need to stop all the tasks until
1363        * we get a new one.
1364        */
1365       /* We also end up here if the thread kills itself with an
1366        * uncaught exception, see Exception.hc.
1367        */
1368       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1369 #if defined(GRAN)
1370       endThread(t, CurrentProc); // clean-up the thread
1371 #elif defined(PAR)
1372       /* For now all are advisory -- HWL */
1373       //if(t->priority==AdvisoryPriority) ??
1374       advisory_thread_count--;
1375       
1376 # ifdef DIST
1377       if(t->dist.priority==RevalPriority)
1378         FinishReval(t);
1379 # endif
1380       
1381       if (RtsFlags.ParFlags.ParStats.Full &&
1382           !RtsFlags.ParFlags.ParStats.Suppressed) 
1383         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1384 #endif
1385       break;
1386       
1387     default:
1388       barf("schedule: invalid thread return code %d", (int)ret);
1389     }
1390     
1391 #ifdef SMP
1392     grabCapability(&cap);
1393 #endif
1394
1395 #ifdef PROFILING
1396     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1397         GarbageCollect(GetRoots, rtsTrue);
1398         heapCensus();
1399         performHeapProfile = rtsFalse;
1400         ready_to_gc = rtsFalse; // we already GC'd
1401     }
1402 #endif
1403
1404 #ifdef SMP
1405     if (ready_to_gc && allFreeCapabilities() )
1406 #else
1407     if (ready_to_gc) 
1408 #endif
1409       {
1410       /* everybody back, start the GC.
1411        * Could do it in this thread, or signal a condition var
1412        * to do it in another thread.  Either way, we need to
1413        * broadcast on gc_pending_cond afterward.
1414        */
1415 #if defined(RTS_SUPPORTS_THREADS)
1416       IF_DEBUG(scheduler,sched_belch("doing GC"));
1417 #endif
1418       GarbageCollect(GetRoots,rtsFalse);
1419       ready_to_gc = rtsFalse;
1420 #ifdef SMP
1421       broadcastCondition(&gc_pending_cond);
1422 #endif
1423 #if defined(GRAN)
1424       /* add a ContinueThread event to continue execution of current thread */
1425       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1426                 ContinueThread,
1427                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1428       IF_GRAN_DEBUG(bq, 
1429                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1430                G_EVENTQ(0);
1431                G_CURR_THREADQ(0));
1432 #endif /* GRAN */
1433     }
1434
1435 #if defined(GRAN)
1436   next_thread:
1437     IF_GRAN_DEBUG(unused,
1438                   print_eventq(EventHd));
1439
1440     event = get_next_event();
1441 #elif defined(PAR)
1442   next_thread:
1443     /* ToDo: wait for next message to arrive rather than busy wait */
1444 #endif /* GRAN */
1445
1446   } /* end of while(1) */
1447
1448   IF_PAR_DEBUG(verbose,
1449                belch("== Leaving schedule() after having received Finish"));
1450 }
1451
1452 /* ---------------------------------------------------------------------------
1453  * deleteAllThreads():  kill all the live threads.
1454  *
1455  * This is used when we catch a user interrupt (^C), before performing
1456  * any necessary cleanups and running finalizers.
1457  * ------------------------------------------------------------------------- */
1458    
1459 void deleteAllThreads ( void )
1460 {
1461   StgTSO* t, *next;
1462   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1463   for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) {
1464       next = t->link;
1465       deleteThread(t);
1466   }
1467   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) {
1468       next = t->link;
1469       deleteThread(t);
1470   }
1471   for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) {
1472       next = t->link;
1473       deleteThread(t);
1474   }
1475   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1476   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1477   sleeping_queue = END_TSO_QUEUE;
1478 }
1479
1480 /* startThread and  insertThread are now in GranSim.c -- HWL */
1481
1482
1483 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1484 //@subsection Suspend and Resume
1485
1486 /* ---------------------------------------------------------------------------
1487  * Suspending & resuming Haskell threads.
1488  * 
1489  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1490  * its capability before calling the C function.  This allows another
1491  * task to pick up the capability and carry on running Haskell
1492  * threads.  It also means that if the C call blocks, it won't lock
1493  * the whole system.
1494  *
1495  * The Haskell thread making the C call is put to sleep for the
1496  * duration of the call, on the susepended_ccalling_threads queue.  We
1497  * give out a token to the task, which it can use to resume the thread
1498  * on return from the C function.
1499  * ------------------------------------------------------------------------- */
1500    
1501 StgInt
1502 suspendThread( StgRegTable *reg )
1503 {
1504   nat tok;
1505   Capability *cap;
1506
1507   /* assume that *reg is a pointer to the StgRegTable part
1508    * of a Capability.
1509    */
1510   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1511
1512   ACQUIRE_LOCK(&sched_mutex);
1513
1514   IF_DEBUG(scheduler,
1515            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1516
1517   threadPaused(cap->r.rCurrentTSO);
1518   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1519   suspended_ccalling_threads = cap->r.rCurrentTSO;
1520
1521   /* Use the thread ID as the token; it should be unique */
1522   tok = cap->r.rCurrentTSO->id;
1523
1524   /* Hand back capability */
1525   releaseCapability(cap);
1526   
1527 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
1528   /* Preparing to leave the RTS, so ensure there's a native thread/task
1529      waiting to take over.
1530      
1531      ToDo: optimise this and only create a new task if there's a need
1532      for one (i.e., if there's only one Concurrent Haskell thread alive,
1533      there's no need to create a new task).
1534   */
1535   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok));
1536   startTask(taskStart);
1537
1538 #endif
1539
1540   THREAD_RUNNABLE();
1541   RELEASE_LOCK(&sched_mutex);
1542   //  RELEASE_LOCK(&rts_mutex);
1543   return tok; 
1544 }
1545
1546 StgRegTable *
1547 resumeThread( StgInt tok )
1548 {
1549   StgTSO *tso, **prev;
1550   Capability *cap;
1551
1552 #if defined(THREADED_RTS)
1553   IF_DEBUG(scheduler, sched_belch("thread %d returning, waiting for sched. lock.\n", tok));
1554   ACQUIRE_LOCK(&sched_mutex);
1555   threads_waiting++;
1556   IF_DEBUG(scheduler, sched_belch("thread %d returning, threads waiting: %d.\n", tok, threads_waiting));
1557   RELEASE_LOCK(&sched_mutex);
1558   
1559   IF_DEBUG(scheduler, sched_belch("thread %d waiting for RTS lock...\n", tok));
1560   ACQUIRE_LOCK(&rts_mutex);
1561   threads_waiting--;
1562   taskNotAvailable();
1563   IF_DEBUG(scheduler, sched_belch("thread %d acquired RTS lock...\n", tok));
1564 #endif
1565
1566 #if defined(THREADED_RTS)
1567   /* Free up any RTS-blocked threads. */
1568   broadcastCondition(&thread_ready_cond);
1569 #endif
1570
1571   /* Remove the thread off of the suspended list */
1572   prev = &suspended_ccalling_threads;
1573   for (tso = suspended_ccalling_threads; 
1574        tso != END_TSO_QUEUE; 
1575        prev = &tso->link, tso = tso->link) {
1576     if (tso->id == (StgThreadID)tok) {
1577       *prev = tso->link;
1578       break;
1579     }
1580   }
1581   if (tso == END_TSO_QUEUE) {
1582     barf("resumeThread: thread not found");
1583   }
1584   tso->link = END_TSO_QUEUE;
1585
1586 #if defined(RTS_SUPPORTS_THREADS)
1587   while ( noCapabilities() ) {
1588     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1589     waitCondition(&thread_ready_cond, &sched_mutex);
1590     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1591   }
1592 #endif
1593
1594   grabCapability(&cap);
1595
1596   cap->r.rCurrentTSO = tso;
1597
1598   return &cap->r;
1599 }
1600
1601
1602 /* ---------------------------------------------------------------------------
1603  * Static functions
1604  * ------------------------------------------------------------------------ */
1605 static void unblockThread(StgTSO *tso);
1606
1607 /* ---------------------------------------------------------------------------
1608  * Comparing Thread ids.
1609  *
1610  * This is used from STG land in the implementation of the
1611  * instances of Eq/Ord for ThreadIds.
1612  * ------------------------------------------------------------------------ */
1613
1614 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1615
1616   StgThreadID id1 = tso1->id; 
1617   StgThreadID id2 = tso2->id;
1618  
1619   if (id1 < id2) return (-1);
1620   if (id1 > id2) return 1;
1621   return 0;
1622 }
1623
1624 /* ---------------------------------------------------------------------------
1625  * Fetching the ThreadID from an StgTSO.
1626  *
1627  * This is used in the implementation of Show for ThreadIds.
1628  * ------------------------------------------------------------------------ */
1629 int rts_getThreadId(const StgTSO *tso) 
1630 {
1631   return tso->id;
1632 }
1633
1634 /* ---------------------------------------------------------------------------
1635    Create a new thread.
1636
1637    The new thread starts with the given stack size.  Before the
1638    scheduler can run, however, this thread needs to have a closure
1639    (and possibly some arguments) pushed on its stack.  See
1640    pushClosure() in Schedule.h.
1641
1642    createGenThread() and createIOThread() (in SchedAPI.h) are
1643    convenient packaged versions of this function.
1644
1645    currently pri (priority) is only used in a GRAN setup -- HWL
1646    ------------------------------------------------------------------------ */
1647 //@cindex createThread
1648 #if defined(GRAN)
1649 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1650 StgTSO *
1651 createThread(nat stack_size, StgInt pri)
1652 {
1653   return createThread_(stack_size, rtsFalse, pri);
1654 }
1655
1656 static StgTSO *
1657 createThread_(nat size, rtsBool have_lock, StgInt pri)
1658 {
1659 #else
1660 StgTSO *
1661 createThread(nat stack_size)
1662 {
1663   return createThread_(stack_size, rtsFalse);
1664 }
1665
1666 static StgTSO *
1667 createThread_(nat size, rtsBool have_lock)
1668 {
1669 #endif
1670
1671     StgTSO *tso;
1672     nat stack_size;
1673
1674     /* First check whether we should create a thread at all */
1675 #if defined(PAR)
1676   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1677   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1678     threadsIgnored++;
1679     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1680           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1681     return END_TSO_QUEUE;
1682   }
1683   threadsCreated++;
1684 #endif
1685
1686 #if defined(GRAN)
1687   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1688 #endif
1689
1690   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1691
1692   /* catch ridiculously small stack sizes */
1693   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1694     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1695   }
1696
1697   stack_size = size - TSO_STRUCT_SIZEW;
1698
1699   tso = (StgTSO *)allocate(size);
1700   TICK_ALLOC_TSO(stack_size, 0);
1701
1702   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1703 #if defined(GRAN)
1704   SET_GRAN_HDR(tso, ThisPE);
1705 #endif
1706   tso->what_next     = ThreadEnterGHC;
1707
1708   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1709    * protect the increment operation on next_thread_id.
1710    * In future, we could use an atomic increment instead.
1711    */
1712   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1713   tso->id = next_thread_id++; 
1714   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1715
1716   tso->why_blocked  = NotBlocked;
1717   tso->blocked_exceptions = NULL;
1718
1719   tso->stack_size   = stack_size;
1720   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1721                               - TSO_STRUCT_SIZEW;
1722   tso->sp           = (P_)&(tso->stack) + stack_size;
1723
1724 #ifdef PROFILING
1725   tso->prof.CCCS = CCS_MAIN;
1726 #endif
1727
1728   /* put a stop frame on the stack */
1729   tso->sp -= sizeofW(StgStopFrame);
1730   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1731   tso->su = (StgUpdateFrame*)tso->sp;
1732
1733   // ToDo: check this
1734 #if defined(GRAN)
1735   tso->link = END_TSO_QUEUE;
1736   /* uses more flexible routine in GranSim */
1737   insertThread(tso, CurrentProc);
1738 #else
1739   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1740    * from its creation
1741    */
1742 #endif
1743
1744 #if defined(GRAN) 
1745   if (RtsFlags.GranFlags.GranSimStats.Full) 
1746     DumpGranEvent(GR_START,tso);
1747 #elif defined(PAR)
1748   if (RtsFlags.ParFlags.ParStats.Full) 
1749     DumpGranEvent(GR_STARTQ,tso);
1750   /* HACk to avoid SCHEDULE 
1751      LastTSO = tso; */
1752 #endif
1753
1754   /* Link the new thread on the global thread list.
1755    */
1756   tso->global_link = all_threads;
1757   all_threads = tso;
1758
1759 #if defined(DIST)
1760   tso->dist.priority = MandatoryPriority; //by default that is...
1761 #endif
1762
1763 #if defined(GRAN)
1764   tso->gran.pri = pri;
1765 # if defined(DEBUG)
1766   tso->gran.magic = TSO_MAGIC; // debugging only
1767 # endif
1768   tso->gran.sparkname   = 0;
1769   tso->gran.startedat   = CURRENT_TIME; 
1770   tso->gran.exported    = 0;
1771   tso->gran.basicblocks = 0;
1772   tso->gran.allocs      = 0;
1773   tso->gran.exectime    = 0;
1774   tso->gran.fetchtime   = 0;
1775   tso->gran.fetchcount  = 0;
1776   tso->gran.blocktime   = 0;
1777   tso->gran.blockcount  = 0;
1778   tso->gran.blockedat   = 0;
1779   tso->gran.globalsparks = 0;
1780   tso->gran.localsparks  = 0;
1781   if (RtsFlags.GranFlags.Light)
1782     tso->gran.clock  = Now; /* local clock */
1783   else
1784     tso->gran.clock  = 0;
1785
1786   IF_DEBUG(gran,printTSO(tso));
1787 #elif defined(PAR)
1788 # if defined(DEBUG)
1789   tso->par.magic = TSO_MAGIC; // debugging only
1790 # endif
1791   tso->par.sparkname   = 0;
1792   tso->par.startedat   = CURRENT_TIME; 
1793   tso->par.exported    = 0;
1794   tso->par.basicblocks = 0;
1795   tso->par.allocs      = 0;
1796   tso->par.exectime    = 0;
1797   tso->par.fetchtime   = 0;
1798   tso->par.fetchcount  = 0;
1799   tso->par.blocktime   = 0;
1800   tso->par.blockcount  = 0;
1801   tso->par.blockedat   = 0;
1802   tso->par.globalsparks = 0;
1803   tso->par.localsparks  = 0;
1804 #endif
1805
1806 #if defined(GRAN)
1807   globalGranStats.tot_threads_created++;
1808   globalGranStats.threads_created_on_PE[CurrentProc]++;
1809   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1810   globalGranStats.tot_sq_probes++;
1811 #elif defined(PAR)
1812   // collect parallel global statistics (currently done together with GC stats)
1813   if (RtsFlags.ParFlags.ParStats.Global &&
1814       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1815     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1816     globalParStats.tot_threads_created++;
1817   }
1818 #endif 
1819
1820 #if defined(GRAN)
1821   IF_GRAN_DEBUG(pri,
1822                 belch("==__ schedule: Created TSO %d (%p);",
1823                       CurrentProc, tso, tso->id));
1824 #elif defined(PAR)
1825     IF_PAR_DEBUG(verbose,
1826                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1827                        tso->id, tso, advisory_thread_count));
1828 #else
1829   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1830                                  tso->id, tso->stack_size));
1831 #endif    
1832   return tso;
1833 }
1834
1835 #if defined(PAR)
1836 /* RFP:
1837    all parallel thread creation calls should fall through the following routine.
1838 */
1839 StgTSO *
1840 createSparkThread(rtsSpark spark) 
1841 { StgTSO *tso;
1842   ASSERT(spark != (rtsSpark)NULL);
1843   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1844   { threadsIgnored++;
1845     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1846           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1847     return END_TSO_QUEUE;
1848   }
1849   else
1850   { threadsCreated++;
1851     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1852     if (tso==END_TSO_QUEUE)     
1853       barf("createSparkThread: Cannot create TSO");
1854 #if defined(DIST)
1855     tso->priority = AdvisoryPriority;
1856 #endif
1857     pushClosure(tso,spark);
1858     PUSH_ON_RUN_QUEUE(tso);
1859     advisory_thread_count++;    
1860   }
1861   return tso;
1862 }
1863 #endif
1864
1865 /*
1866   Turn a spark into a thread.
1867   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1868 */
1869 #if defined(PAR)
1870 //@cindex activateSpark
1871 StgTSO *
1872 activateSpark (rtsSpark spark) 
1873 {
1874   StgTSO *tso;
1875
1876   tso = createSparkThread(spark);
1877   if (RtsFlags.ParFlags.ParStats.Full) {   
1878     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1879     IF_PAR_DEBUG(verbose,
1880                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1881                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1882   }
1883   // ToDo: fwd info on local/global spark to thread -- HWL
1884   // tso->gran.exported =  spark->exported;
1885   // tso->gran.locked =   !spark->global;
1886   // tso->gran.sparkname = spark->name;
1887
1888   return tso;
1889 }
1890 #endif
1891
1892 /* ---------------------------------------------------------------------------
1893  * scheduleThread()
1894  *
1895  * scheduleThread puts a thread on the head of the runnable queue.
1896  * This will usually be done immediately after a thread is created.
1897  * The caller of scheduleThread must create the thread using e.g.
1898  * createThread and push an appropriate closure
1899  * on this thread's stack before the scheduler is invoked.
1900  * ------------------------------------------------------------------------ */
1901
1902 void
1903 scheduleThread(StgTSO *tso)
1904 {
1905   ACQUIRE_LOCK(&sched_mutex);
1906
1907   /* Put the new thread on the head of the runnable queue.  The caller
1908    * better push an appropriate closure on this thread's stack
1909    * beforehand.  In the SMP case, the thread may start running as
1910    * soon as we release the scheduler lock below.
1911    */
1912   PUSH_ON_RUN_QUEUE(tso);
1913   THREAD_RUNNABLE();
1914
1915 #if 0
1916   IF_DEBUG(scheduler,printTSO(tso));
1917 #endif
1918   RELEASE_LOCK(&sched_mutex);
1919 }
1920
1921 /* ---------------------------------------------------------------------------
1922  * initScheduler()
1923  *
1924  * Initialise the scheduler.  This resets all the queues - if the
1925  * queues contained any threads, they'll be garbage collected at the
1926  * next pass.
1927  *
1928  * ------------------------------------------------------------------------ */
1929
1930 #ifdef SMP
1931 static void
1932 term_handler(int sig STG_UNUSED)
1933 {
1934   stat_workerStop();
1935   ACQUIRE_LOCK(&term_mutex);
1936   await_death--;
1937   RELEASE_LOCK(&term_mutex);
1938   shutdownThread();
1939 }
1940 #endif
1941
1942 void 
1943 initScheduler(void)
1944 {
1945 #if defined(GRAN)
1946   nat i;
1947
1948   for (i=0; i<=MAX_PROC; i++) {
1949     run_queue_hds[i]      = END_TSO_QUEUE;
1950     run_queue_tls[i]      = END_TSO_QUEUE;
1951     blocked_queue_hds[i]  = END_TSO_QUEUE;
1952     blocked_queue_tls[i]  = END_TSO_QUEUE;
1953     ccalling_threadss[i]  = END_TSO_QUEUE;
1954     sleeping_queue        = END_TSO_QUEUE;
1955   }
1956 #else
1957   run_queue_hd      = END_TSO_QUEUE;
1958   run_queue_tl      = END_TSO_QUEUE;
1959   blocked_queue_hd  = END_TSO_QUEUE;
1960   blocked_queue_tl  = END_TSO_QUEUE;
1961   sleeping_queue    = END_TSO_QUEUE;
1962 #endif 
1963
1964   suspended_ccalling_threads  = END_TSO_QUEUE;
1965
1966   main_threads = NULL;
1967   all_threads  = END_TSO_QUEUE;
1968
1969   context_switch = 0;
1970   interrupted    = 0;
1971
1972   RtsFlags.ConcFlags.ctxtSwitchTicks =
1973       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1974       
1975 #if defined(RTS_SUPPORTS_THREADS)
1976   /* Initialise the mutex and condition variables used by
1977    * the scheduler. */
1978   initMutex(&sched_mutex);
1979   initMutex(&term_mutex);
1980
1981   initCondition(&thread_ready_cond);
1982 #if defined(THREADED_RTS)
1983   initMutex(&rts_mutex);
1984 #endif
1985   
1986   initCondition(&gc_pending_cond);
1987 #endif
1988
1989 #if defined(THREADED_RTS)
1990   /* Grab big lock */
1991   ACQUIRE_LOCK(&rts_mutex);
1992   IF_DEBUG(scheduler, 
1993            sched_belch("worker thread (%d): acquired RTS lock\n", osThreadId()));
1994 #endif
1995
1996   /* Install the SIGHUP handler */
1997 #ifdef SMP
1998   {
1999     struct sigaction action,oact;
2000
2001     action.sa_handler = term_handler;
2002     sigemptyset(&action.sa_mask);
2003     action.sa_flags = 0;
2004     if (sigaction(SIGTERM, &action, &oact) != 0) {
2005       barf("can't install TERM handler");
2006     }
2007   }
2008 #endif
2009
2010   /* A capability holds the state a native thread needs in
2011    * order to execute STG code. At least one capability is
2012    * floating around (only SMP builds have more than one).
2013    */
2014   initCapabilities();
2015   
2016 #if defined(RTS_SUPPORTS_THREADS)
2017     /* start our haskell execution tasks */
2018 # if defined(SMP)
2019     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2020 # else
2021     startTaskManager(0,taskStart);
2022 # endif
2023 #endif
2024
2025 #if /* defined(SMP) ||*/ defined(PAR)
2026   initSparkPools();
2027 #endif
2028 }
2029
2030 void
2031 exitScheduler( void )
2032 {
2033 #if defined(RTS_SUPPORTS_THREADS)
2034   stopTaskManager();
2035 #endif
2036 }
2037
2038 /* -----------------------------------------------------------------------------
2039    Managing the per-task allocation areas.
2040    
2041    Each capability comes with an allocation area.  These are
2042    fixed-length block lists into which allocation can be done.
2043
2044    ToDo: no support for two-space collection at the moment???
2045    -------------------------------------------------------------------------- */
2046
2047 /* -----------------------------------------------------------------------------
2048  * waitThread is the external interface for running a new computation
2049  * and waiting for the result.
2050  *
2051  * In the non-SMP case, we create a new main thread, push it on the 
2052  * main-thread stack, and invoke the scheduler to run it.  The
2053  * scheduler will return when the top main thread on the stack has
2054  * completed or died, and fill in the necessary fields of the
2055  * main_thread structure.
2056  *
2057  * In the SMP case, we create a main thread as before, but we then
2058  * create a new condition variable and sleep on it.  When our new
2059  * main thread has completed, we'll be woken up and the status/result
2060  * will be in the main_thread struct.
2061  * -------------------------------------------------------------------------- */
2062
2063 int 
2064 howManyThreadsAvail ( void )
2065 {
2066    int i = 0;
2067    StgTSO* q;
2068    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2069       i++;
2070    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2071       i++;
2072    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2073       i++;
2074    return i;
2075 }
2076
2077 void
2078 finishAllThreads ( void )
2079 {
2080    do {
2081       while (run_queue_hd != END_TSO_QUEUE) {
2082          waitThread ( run_queue_hd, NULL );
2083       }
2084       while (blocked_queue_hd != END_TSO_QUEUE) {
2085          waitThread ( blocked_queue_hd, NULL );
2086       }
2087       while (sleeping_queue != END_TSO_QUEUE) {
2088          waitThread ( blocked_queue_hd, NULL );
2089       }
2090    } while 
2091       (blocked_queue_hd != END_TSO_QUEUE || 
2092        run_queue_hd     != END_TSO_QUEUE ||
2093        sleeping_queue   != END_TSO_QUEUE);
2094 }
2095
2096 SchedulerStatus
2097 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2098 {
2099   StgMainThread *m;
2100   SchedulerStatus stat;
2101
2102   ACQUIRE_LOCK(&sched_mutex);
2103   
2104   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2105
2106   m->tso = tso;
2107   m->ret = ret;
2108   m->stat = NoStatus;
2109 #if defined(RTS_SUPPORTS_THREADS)
2110   initCondition(&m->wakeup);
2111 #endif
2112
2113   m->link = main_threads;
2114   main_threads = m;
2115
2116   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
2117                               m->tso->id));
2118
2119 #ifdef SMP
2120   do {
2121     waitCondition(&m->wakeup, &sched_mutex);
2122   } while (m->stat == NoStatus);
2123 #elif defined(GRAN)
2124   /* GranSim specific init */
2125   CurrentTSO = m->tso;                // the TSO to run
2126   procStatus[MainProc] = Busy;        // status of main PE
2127   CurrentProc = MainProc;             // PE to run it on
2128
2129   schedule();
2130 #else
2131   RELEASE_LOCK(&sched_mutex);
2132   schedule();
2133   ASSERT(m->stat != NoStatus);
2134 #endif
2135
2136   stat = m->stat;
2137
2138 #if defined(RTS_SUPPORTS_THREADS)
2139   closeCondition(&m->wakeup);
2140 #endif
2141
2142   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2143                               m->tso->id));
2144   free(m);
2145
2146   RELEASE_LOCK(&sched_mutex);
2147
2148   return stat;
2149 }
2150
2151 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2152 //@subsection Run queue code 
2153
2154 #if 0
2155 /* 
2156    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2157        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2158        implicit global variable that has to be correct when calling these
2159        fcts -- HWL 
2160 */
2161
2162 /* Put the new thread on the head of the runnable queue.
2163  * The caller of createThread better push an appropriate closure
2164  * on this thread's stack before the scheduler is invoked.
2165  */
2166 static /* inline */ void
2167 add_to_run_queue(tso)
2168 StgTSO* tso; 
2169 {
2170   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2171   tso->link = run_queue_hd;
2172   run_queue_hd = tso;
2173   if (run_queue_tl == END_TSO_QUEUE) {
2174     run_queue_tl = tso;
2175   }
2176 }
2177
2178 /* Put the new thread at the end of the runnable queue. */
2179 static /* inline */ void
2180 push_on_run_queue(tso)
2181 StgTSO* tso; 
2182 {
2183   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2184   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2185   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2186   if (run_queue_hd == END_TSO_QUEUE) {
2187     run_queue_hd = tso;
2188   } else {
2189     run_queue_tl->link = tso;
2190   }
2191   run_queue_tl = tso;
2192 }
2193
2194 /* 
2195    Should be inlined because it's used very often in schedule.  The tso
2196    argument is actually only needed in GranSim, where we want to have the
2197    possibility to schedule *any* TSO on the run queue, irrespective of the
2198    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2199    the run queue and dequeue the tso, adjusting the links in the queue. 
2200 */
2201 //@cindex take_off_run_queue
2202 static /* inline */ StgTSO*
2203 take_off_run_queue(StgTSO *tso) {
2204   StgTSO *t, *prev;
2205
2206   /* 
2207      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2208
2209      if tso is specified, unlink that tso from the run_queue (doesn't have
2210      to be at the beginning of the queue); GranSim only 
2211   */
2212   if (tso!=END_TSO_QUEUE) {
2213     /* find tso in queue */
2214     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2215          t!=END_TSO_QUEUE && t!=tso;
2216          prev=t, t=t->link) 
2217       /* nothing */ ;
2218     ASSERT(t==tso);
2219     /* now actually dequeue the tso */
2220     if (prev!=END_TSO_QUEUE) {
2221       ASSERT(run_queue_hd!=t);
2222       prev->link = t->link;
2223     } else {
2224       /* t is at beginning of thread queue */
2225       ASSERT(run_queue_hd==t);
2226       run_queue_hd = t->link;
2227     }
2228     /* t is at end of thread queue */
2229     if (t->link==END_TSO_QUEUE) {
2230       ASSERT(t==run_queue_tl);
2231       run_queue_tl = prev;
2232     } else {
2233       ASSERT(run_queue_tl!=t);
2234     }
2235     t->link = END_TSO_QUEUE;
2236   } else {
2237     /* take tso from the beginning of the queue; std concurrent code */
2238     t = run_queue_hd;
2239     if (t != END_TSO_QUEUE) {
2240       run_queue_hd = t->link;
2241       t->link = END_TSO_QUEUE;
2242       if (run_queue_hd == END_TSO_QUEUE) {
2243         run_queue_tl = END_TSO_QUEUE;
2244       }
2245     }
2246   }
2247   return t;
2248 }
2249
2250 #endif /* 0 */
2251
2252 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2253 //@subsection Garbage Collextion Routines
2254
2255 /* ---------------------------------------------------------------------------
2256    Where are the roots that we know about?
2257
2258         - all the threads on the runnable queue
2259         - all the threads on the blocked queue
2260         - all the threads on the sleeping queue
2261         - all the thread currently executing a _ccall_GC
2262         - all the "main threads"
2263      
2264    ------------------------------------------------------------------------ */
2265
2266 /* This has to be protected either by the scheduler monitor, or by the
2267         garbage collection monitor (probably the latter).
2268         KH @ 25/10/99
2269 */
2270
2271 void
2272 GetRoots(evac_fn evac)
2273 {
2274   StgMainThread *m;
2275
2276 #if defined(GRAN)
2277   {
2278     nat i;
2279     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2280       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2281           evac((StgClosure **)&run_queue_hds[i]);
2282       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2283           evac((StgClosure **)&run_queue_tls[i]);
2284       
2285       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2286           evac((StgClosure **)&blocked_queue_hds[i]);
2287       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2288           evac((StgClosure **)&blocked_queue_tls[i]);
2289       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2290           evac((StgClosure **)&ccalling_threads[i]);
2291     }
2292   }
2293
2294   markEventQueue();
2295
2296 #else /* !GRAN */
2297   if (run_queue_hd != END_TSO_QUEUE) {
2298       ASSERT(run_queue_tl != END_TSO_QUEUE);
2299       evac((StgClosure **)&run_queue_hd);
2300       evac((StgClosure **)&run_queue_tl);
2301   }
2302   
2303   if (blocked_queue_hd != END_TSO_QUEUE) {
2304       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2305       evac((StgClosure **)&blocked_queue_hd);
2306       evac((StgClosure **)&blocked_queue_tl);
2307   }
2308   
2309   if (sleeping_queue != END_TSO_QUEUE) {
2310       evac((StgClosure **)&sleeping_queue);
2311   }
2312 #endif 
2313
2314   for (m = main_threads; m != NULL; m = m->link) {
2315       evac((StgClosure **)&m->tso);
2316   }
2317   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2318       evac((StgClosure **)&suspended_ccalling_threads);
2319   }
2320
2321 #if defined(PAR) || defined(GRAN)
2322   markSparkQueue(evac);
2323 #endif
2324 }
2325
2326 /* -----------------------------------------------------------------------------
2327    performGC
2328
2329    This is the interface to the garbage collector from Haskell land.
2330    We provide this so that external C code can allocate and garbage
2331    collect when called from Haskell via _ccall_GC.
2332
2333    It might be useful to provide an interface whereby the programmer
2334    can specify more roots (ToDo).
2335    
2336    This needs to be protected by the GC condition variable above.  KH.
2337    -------------------------------------------------------------------------- */
2338
2339 void (*extra_roots)(evac_fn);
2340
2341 void
2342 performGC(void)
2343 {
2344   GarbageCollect(GetRoots,rtsFalse);
2345 }
2346
2347 void
2348 performMajorGC(void)
2349 {
2350   GarbageCollect(GetRoots,rtsTrue);
2351 }
2352
2353 static void
2354 AllRoots(evac_fn evac)
2355 {
2356     GetRoots(evac);             // the scheduler's roots
2357     extra_roots(evac);          // the user's roots
2358 }
2359
2360 void
2361 performGCWithRoots(void (*get_roots)(evac_fn))
2362 {
2363   extra_roots = get_roots;
2364   GarbageCollect(AllRoots,rtsFalse);
2365 }
2366
2367 /* -----------------------------------------------------------------------------
2368    Stack overflow
2369
2370    If the thread has reached its maximum stack size, then raise the
2371    StackOverflow exception in the offending thread.  Otherwise
2372    relocate the TSO into a larger chunk of memory and adjust its stack
2373    size appropriately.
2374    -------------------------------------------------------------------------- */
2375
2376 static StgTSO *
2377 threadStackOverflow(StgTSO *tso)
2378 {
2379   nat new_stack_size, new_tso_size, diff, stack_words;
2380   StgPtr new_sp;
2381   StgTSO *dest;
2382
2383   IF_DEBUG(sanity,checkTSO(tso));
2384   if (tso->stack_size >= tso->max_stack_size) {
2385
2386     IF_DEBUG(gc,
2387              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2388                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2389              /* If we're debugging, just print out the top of the stack */
2390              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2391                                               tso->sp+64)));
2392
2393     /* Send this thread the StackOverflow exception */
2394     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2395     return tso;
2396   }
2397
2398   /* Try to double the current stack size.  If that takes us over the
2399    * maximum stack size for this thread, then use the maximum instead.
2400    * Finally round up so the TSO ends up as a whole number of blocks.
2401    */
2402   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2403   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2404                                        TSO_STRUCT_SIZE)/sizeof(W_);
2405   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2406   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2407
2408   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2409
2410   dest = (StgTSO *)allocate(new_tso_size);
2411   TICK_ALLOC_TSO(new_stack_size,0);
2412
2413   /* copy the TSO block and the old stack into the new area */
2414   memcpy(dest,tso,TSO_STRUCT_SIZE);
2415   stack_words = tso->stack + tso->stack_size - tso->sp;
2416   new_sp = (P_)dest + new_tso_size - stack_words;
2417   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2418
2419   /* relocate the stack pointers... */
2420   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2421   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2422   dest->sp    = new_sp;
2423   dest->stack_size = new_stack_size;
2424         
2425   /* and relocate the update frame list */
2426   relocate_stack(dest, diff);
2427
2428   /* Mark the old TSO as relocated.  We have to check for relocated
2429    * TSOs in the garbage collector and any primops that deal with TSOs.
2430    *
2431    * It's important to set the sp and su values to just beyond the end
2432    * of the stack, so we don't attempt to scavenge any part of the
2433    * dead TSO's stack.
2434    */
2435   tso->what_next = ThreadRelocated;
2436   tso->link = dest;
2437   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2438   tso->su = (StgUpdateFrame *)tso->sp;
2439   tso->why_blocked = NotBlocked;
2440   dest->mut_link = NULL;
2441
2442   IF_PAR_DEBUG(verbose,
2443                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2444                      tso->id, tso, tso->stack_size);
2445                /* If we're debugging, just print out the top of the stack */
2446                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2447                                                 tso->sp+64)));
2448   
2449   IF_DEBUG(sanity,checkTSO(tso));
2450 #if 0
2451   IF_DEBUG(scheduler,printTSO(dest));
2452 #endif
2453
2454   return dest;
2455 }
2456
2457 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2458 //@subsection Blocking Queue Routines
2459
2460 /* ---------------------------------------------------------------------------
2461    Wake up a queue that was blocked on some resource.
2462    ------------------------------------------------------------------------ */
2463
2464 #if defined(GRAN)
2465 static inline void
2466 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2467 {
2468 }
2469 #elif defined(PAR)
2470 static inline void
2471 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2472 {
2473   /* write RESUME events to log file and
2474      update blocked and fetch time (depending on type of the orig closure) */
2475   if (RtsFlags.ParFlags.ParStats.Full) {
2476     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2477                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2478                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2479     if (EMPTY_RUN_QUEUE())
2480       emitSchedule = rtsTrue;
2481
2482     switch (get_itbl(node)->type) {
2483         case FETCH_ME_BQ:
2484           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2485           break;
2486         case RBH:
2487         case FETCH_ME:
2488         case BLACKHOLE_BQ:
2489           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2490           break;
2491 #ifdef DIST
2492         case MVAR:
2493           break;
2494 #endif    
2495         default:
2496           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2497         }
2498       }
2499 }
2500 #endif
2501
2502 #if defined(GRAN)
2503 static StgBlockingQueueElement *
2504 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2505 {
2506     StgTSO *tso;
2507     PEs node_loc, tso_loc;
2508
2509     node_loc = where_is(node); // should be lifted out of loop
2510     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2511     tso_loc = where_is((StgClosure *)tso);
2512     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2513       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2514       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2515       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2516       // insertThread(tso, node_loc);
2517       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2518                 ResumeThread,
2519                 tso, node, (rtsSpark*)NULL);
2520       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2521       // len_local++;
2522       // len++;
2523     } else { // TSO is remote (actually should be FMBQ)
2524       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2525                                   RtsFlags.GranFlags.Costs.gunblocktime +
2526                                   RtsFlags.GranFlags.Costs.latency;
2527       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2528                 UnblockThread,
2529                 tso, node, (rtsSpark*)NULL);
2530       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2531       // len++;
2532     }
2533     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2534     IF_GRAN_DEBUG(bq,
2535                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2536                           (node_loc==tso_loc ? "Local" : "Global"), 
2537                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2538     tso->block_info.closure = NULL;
2539     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2540                              tso->id, tso));
2541 }
2542 #elif defined(PAR)
2543 static StgBlockingQueueElement *
2544 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2545 {
2546     StgBlockingQueueElement *next;
2547
2548     switch (get_itbl(bqe)->type) {
2549     case TSO:
2550       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2551       /* if it's a TSO just push it onto the run_queue */
2552       next = bqe->link;
2553       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2554       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2555       THREAD_RUNNABLE();
2556       unblockCount(bqe, node);
2557       /* reset blocking status after dumping event */
2558       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2559       break;
2560
2561     case BLOCKED_FETCH:
2562       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2563       next = bqe->link;
2564       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2565       PendingFetches = (StgBlockedFetch *)bqe;
2566       break;
2567
2568 # if defined(DEBUG)
2569       /* can ignore this case in a non-debugging setup; 
2570          see comments on RBHSave closures above */
2571     case CONSTR:
2572       /* check that the closure is an RBHSave closure */
2573       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2574              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2575              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2576       break;
2577
2578     default:
2579       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2580            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2581            (StgClosure *)bqe);
2582 # endif
2583     }
2584   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2585   return next;
2586 }
2587
2588 #else /* !GRAN && !PAR */
2589 static StgTSO *
2590 unblockOneLocked(StgTSO *tso)
2591 {
2592   StgTSO *next;
2593
2594   ASSERT(get_itbl(tso)->type == TSO);
2595   ASSERT(tso->why_blocked != NotBlocked);
2596   tso->why_blocked = NotBlocked;
2597   next = tso->link;
2598   PUSH_ON_RUN_QUEUE(tso);
2599   THREAD_RUNNABLE();
2600   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2601   return next;
2602 }
2603 #endif
2604
2605 #if defined(GRAN) || defined(PAR)
2606 inline StgBlockingQueueElement *
2607 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2608 {
2609   ACQUIRE_LOCK(&sched_mutex);
2610   bqe = unblockOneLocked(bqe, node);
2611   RELEASE_LOCK(&sched_mutex);
2612   return bqe;
2613 }
2614 #else
2615 inline StgTSO *
2616 unblockOne(StgTSO *tso)
2617 {
2618   ACQUIRE_LOCK(&sched_mutex);
2619   tso = unblockOneLocked(tso);
2620   RELEASE_LOCK(&sched_mutex);
2621   return tso;
2622 }
2623 #endif
2624
2625 #if defined(GRAN)
2626 void 
2627 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2628 {
2629   StgBlockingQueueElement *bqe;
2630   PEs node_loc;
2631   nat len = 0; 
2632
2633   IF_GRAN_DEBUG(bq, 
2634                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2635                       node, CurrentProc, CurrentTime[CurrentProc], 
2636                       CurrentTSO->id, CurrentTSO));
2637
2638   node_loc = where_is(node);
2639
2640   ASSERT(q == END_BQ_QUEUE ||
2641          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2642          get_itbl(q)->type == CONSTR); // closure (type constructor)
2643   ASSERT(is_unique(node));
2644
2645   /* FAKE FETCH: magically copy the node to the tso's proc;
2646      no Fetch necessary because in reality the node should not have been 
2647      moved to the other PE in the first place
2648   */
2649   if (CurrentProc!=node_loc) {
2650     IF_GRAN_DEBUG(bq, 
2651                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2652                         node, node_loc, CurrentProc, CurrentTSO->id, 
2653                         // CurrentTSO, where_is(CurrentTSO),
2654                         node->header.gran.procs));
2655     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2656     IF_GRAN_DEBUG(bq, 
2657                   belch("## new bitmask of node %p is %#x",
2658                         node, node->header.gran.procs));
2659     if (RtsFlags.GranFlags.GranSimStats.Global) {
2660       globalGranStats.tot_fake_fetches++;
2661     }
2662   }
2663
2664   bqe = q;
2665   // ToDo: check: ASSERT(CurrentProc==node_loc);
2666   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2667     //next = bqe->link;
2668     /* 
2669        bqe points to the current element in the queue
2670        next points to the next element in the queue
2671     */
2672     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2673     //tso_loc = where_is(tso);
2674     len++;
2675     bqe = unblockOneLocked(bqe, node);
2676   }
2677
2678   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2679      the closure to make room for the anchor of the BQ */
2680   if (bqe!=END_BQ_QUEUE) {
2681     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2682     /*
2683     ASSERT((info_ptr==&RBH_Save_0_info) ||
2684            (info_ptr==&RBH_Save_1_info) ||
2685            (info_ptr==&RBH_Save_2_info));
2686     */
2687     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2688     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2689     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2690
2691     IF_GRAN_DEBUG(bq,
2692                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2693                         node, info_type(node)));
2694   }
2695
2696   /* statistics gathering */
2697   if (RtsFlags.GranFlags.GranSimStats.Global) {
2698     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2699     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2700     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2701     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2702   }
2703   IF_GRAN_DEBUG(bq,
2704                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2705                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2706 }
2707 #elif defined(PAR)
2708 void 
2709 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2710 {
2711   StgBlockingQueueElement *bqe;
2712
2713   ACQUIRE_LOCK(&sched_mutex);
2714
2715   IF_PAR_DEBUG(verbose, 
2716                belch("##-_ AwBQ for node %p on [%x]: ",
2717                      node, mytid));
2718 #ifdef DIST  
2719   //RFP
2720   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2721     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2722     return;
2723   }
2724 #endif
2725   
2726   ASSERT(q == END_BQ_QUEUE ||
2727          get_itbl(q)->type == TSO ||           
2728          get_itbl(q)->type == BLOCKED_FETCH || 
2729          get_itbl(q)->type == CONSTR); 
2730
2731   bqe = q;
2732   while (get_itbl(bqe)->type==TSO || 
2733          get_itbl(bqe)->type==BLOCKED_FETCH) {
2734     bqe = unblockOneLocked(bqe, node);
2735   }
2736   RELEASE_LOCK(&sched_mutex);
2737 }
2738
2739 #else   /* !GRAN && !PAR */
2740 void
2741 awakenBlockedQueue(StgTSO *tso)
2742 {
2743   ACQUIRE_LOCK(&sched_mutex);
2744   while (tso != END_TSO_QUEUE) {
2745     tso = unblockOneLocked(tso);
2746   }
2747   RELEASE_LOCK(&sched_mutex);
2748 }
2749 #endif
2750
2751 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2752 //@subsection Exception Handling Routines
2753
2754 /* ---------------------------------------------------------------------------
2755    Interrupt execution
2756    - usually called inside a signal handler so it mustn't do anything fancy.   
2757    ------------------------------------------------------------------------ */
2758
2759 void
2760 interruptStgRts(void)
2761 {
2762     interrupted    = 1;
2763     context_switch = 1;
2764 }
2765
2766 /* -----------------------------------------------------------------------------
2767    Unblock a thread
2768
2769    This is for use when we raise an exception in another thread, which
2770    may be blocked.
2771    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2772    -------------------------------------------------------------------------- */
2773
2774 #if defined(GRAN) || defined(PAR)
2775 /*
2776   NB: only the type of the blocking queue is different in GranSim and GUM
2777       the operations on the queue-elements are the same
2778       long live polymorphism!
2779 */
2780 static void
2781 unblockThread(StgTSO *tso)
2782 {
2783   StgBlockingQueueElement *t, **last;
2784
2785   ACQUIRE_LOCK(&sched_mutex);
2786   switch (tso->why_blocked) {
2787
2788   case NotBlocked:
2789     return;  /* not blocked */
2790
2791   case BlockedOnMVar:
2792     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2793     {
2794       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2795       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2796
2797       last = (StgBlockingQueueElement **)&mvar->head;
2798       for (t = (StgBlockingQueueElement *)mvar->head; 
2799            t != END_BQ_QUEUE; 
2800            last = &t->link, last_tso = t, t = t->link) {
2801         if (t == (StgBlockingQueueElement *)tso) {
2802           *last = (StgBlockingQueueElement *)tso->link;
2803           if (mvar->tail == tso) {
2804             mvar->tail = (StgTSO *)last_tso;
2805           }
2806           goto done;
2807         }
2808       }
2809       barf("unblockThread (MVAR): TSO not found");
2810     }
2811
2812   case BlockedOnBlackHole:
2813     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2814     {
2815       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2816
2817       last = &bq->blocking_queue;
2818       for (t = bq->blocking_queue; 
2819            t != END_BQ_QUEUE; 
2820            last = &t->link, t = t->link) {
2821         if (t == (StgBlockingQueueElement *)tso) {
2822           *last = (StgBlockingQueueElement *)tso->link;
2823           goto done;
2824         }
2825       }
2826       barf("unblockThread (BLACKHOLE): TSO not found");
2827     }
2828
2829   case BlockedOnException:
2830     {
2831       StgTSO *target  = tso->block_info.tso;
2832
2833       ASSERT(get_itbl(target)->type == TSO);
2834
2835       if (target->what_next == ThreadRelocated) {
2836           target = target->link;
2837           ASSERT(get_itbl(target)->type == TSO);
2838       }
2839
2840       ASSERT(target->blocked_exceptions != NULL);
2841
2842       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2843       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2844            t != END_BQ_QUEUE; 
2845            last = &t->link, t = t->link) {
2846         ASSERT(get_itbl(t)->type == TSO);
2847         if (t == (StgBlockingQueueElement *)tso) {
2848           *last = (StgBlockingQueueElement *)tso->link;
2849           goto done;
2850         }
2851       }
2852       barf("unblockThread (Exception): TSO not found");
2853     }
2854
2855   case BlockedOnRead:
2856   case BlockedOnWrite:
2857     {
2858       /* take TSO off blocked_queue */
2859       StgBlockingQueueElement *prev = NULL;
2860       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2861            prev = t, t = t->link) {
2862         if (t == (StgBlockingQueueElement *)tso) {
2863           if (prev == NULL) {
2864             blocked_queue_hd = (StgTSO *)t->link;
2865             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2866               blocked_queue_tl = END_TSO_QUEUE;
2867             }
2868           } else {
2869             prev->link = t->link;
2870             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2871               blocked_queue_tl = (StgTSO *)prev;
2872             }
2873           }
2874           goto done;
2875         }
2876       }
2877       barf("unblockThread (I/O): TSO not found");
2878     }
2879
2880   case BlockedOnDelay:
2881     {
2882       /* take TSO off sleeping_queue */
2883       StgBlockingQueueElement *prev = NULL;
2884       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2885            prev = t, t = t->link) {
2886         if (t == (StgBlockingQueueElement *)tso) {
2887           if (prev == NULL) {
2888             sleeping_queue = (StgTSO *)t->link;
2889           } else {
2890             prev->link = t->link;
2891           }
2892           goto done;
2893         }
2894       }
2895       barf("unblockThread (I/O): TSO not found");
2896     }
2897
2898   default:
2899     barf("unblockThread");
2900   }
2901
2902  done:
2903   tso->link = END_TSO_QUEUE;
2904   tso->why_blocked = NotBlocked;
2905   tso->block_info.closure = NULL;
2906   PUSH_ON_RUN_QUEUE(tso);
2907   RELEASE_LOCK(&sched_mutex);
2908 }
2909 #else
2910 static void
2911 unblockThread(StgTSO *tso)
2912 {
2913   StgTSO *t, **last;
2914
2915   ACQUIRE_LOCK(&sched_mutex);
2916   switch (tso->why_blocked) {
2917
2918   case NotBlocked:
2919     return;  /* not blocked */
2920
2921   case BlockedOnMVar:
2922     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2923     {
2924       StgTSO *last_tso = END_TSO_QUEUE;
2925       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2926
2927       last = &mvar->head;
2928       for (t = mvar->head; t != END_TSO_QUEUE; 
2929            last = &t->link, last_tso = t, t = t->link) {
2930         if (t == tso) {
2931           *last = tso->link;
2932           if (mvar->tail == tso) {
2933             mvar->tail = last_tso;
2934           }
2935           goto done;
2936         }
2937       }
2938       barf("unblockThread (MVAR): TSO not found");
2939     }
2940
2941   case BlockedOnBlackHole:
2942     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2943     {
2944       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2945
2946       last = &bq->blocking_queue;
2947       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2948            last = &t->link, t = t->link) {
2949         if (t == tso) {
2950           *last = tso->link;
2951           goto done;
2952         }
2953       }
2954       barf("unblockThread (BLACKHOLE): TSO not found");
2955     }
2956
2957   case BlockedOnException:
2958     {
2959       StgTSO *target  = tso->block_info.tso;
2960
2961       ASSERT(get_itbl(target)->type == TSO);
2962
2963       while (target->what_next == ThreadRelocated) {
2964           target = target->link;
2965           ASSERT(get_itbl(target)->type == TSO);
2966       }
2967       
2968       ASSERT(target->blocked_exceptions != NULL);
2969
2970       last = &target->blocked_exceptions;
2971       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2972            last = &t->link, t = t->link) {
2973         ASSERT(get_itbl(t)->type == TSO);
2974         if (t == tso) {
2975           *last = tso->link;
2976           goto done;
2977         }
2978       }
2979       barf("unblockThread (Exception): TSO not found");
2980     }
2981
2982   case BlockedOnRead:
2983   case BlockedOnWrite:
2984     {
2985       StgTSO *prev = NULL;
2986       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2987            prev = t, t = t->link) {
2988         if (t == tso) {
2989           if (prev == NULL) {
2990             blocked_queue_hd = t->link;
2991             if (blocked_queue_tl == t) {
2992               blocked_queue_tl = END_TSO_QUEUE;
2993             }
2994           } else {
2995             prev->link = t->link;
2996             if (blocked_queue_tl == t) {
2997               blocked_queue_tl = prev;
2998             }
2999           }
3000           goto done;
3001         }
3002       }
3003       barf("unblockThread (I/O): TSO not found");
3004     }
3005
3006   case BlockedOnDelay:
3007     {
3008       StgTSO *prev = NULL;
3009       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3010            prev = t, t = t->link) {
3011         if (t == tso) {
3012           if (prev == NULL) {
3013             sleeping_queue = t->link;
3014           } else {
3015             prev->link = t->link;
3016           }
3017           goto done;
3018         }
3019       }
3020       barf("unblockThread (I/O): TSO not found");
3021     }
3022
3023   default:
3024     barf("unblockThread");
3025   }
3026
3027  done:
3028   tso->link = END_TSO_QUEUE;
3029   tso->why_blocked = NotBlocked;
3030   tso->block_info.closure = NULL;
3031   PUSH_ON_RUN_QUEUE(tso);
3032   RELEASE_LOCK(&sched_mutex);
3033 }
3034 #endif
3035
3036 /* -----------------------------------------------------------------------------
3037  * raiseAsync()
3038  *
3039  * The following function implements the magic for raising an
3040  * asynchronous exception in an existing thread.
3041  *
3042  * We first remove the thread from any queue on which it might be
3043  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3044  *
3045  * We strip the stack down to the innermost CATCH_FRAME, building
3046  * thunks in the heap for all the active computations, so they can 
3047  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3048  * an application of the handler to the exception, and push it on
3049  * the top of the stack.
3050  * 
3051  * How exactly do we save all the active computations?  We create an
3052  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3053  * AP_UPDs pushes everything from the corresponding update frame
3054  * upwards onto the stack.  (Actually, it pushes everything up to the
3055  * next update frame plus a pointer to the next AP_UPD object.
3056  * Entering the next AP_UPD object pushes more onto the stack until we
3057  * reach the last AP_UPD object - at which point the stack should look
3058  * exactly as it did when we killed the TSO and we can continue
3059  * execution by entering the closure on top of the stack.
3060  *
3061  * We can also kill a thread entirely - this happens if either (a) the 
3062  * exception passed to raiseAsync is NULL, or (b) there's no
3063  * CATCH_FRAME on the stack.  In either case, we strip the entire
3064  * stack and replace the thread with a zombie.
3065  *
3066  * -------------------------------------------------------------------------- */
3067  
3068 void 
3069 deleteThread(StgTSO *tso)
3070 {
3071   raiseAsync(tso,NULL);
3072 }
3073
3074 void
3075 raiseAsync(StgTSO *tso, StgClosure *exception)
3076 {
3077   StgUpdateFrame* su = tso->su;
3078   StgPtr          sp = tso->sp;
3079   
3080   /* Thread already dead? */
3081   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3082     return;
3083   }
3084
3085   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3086
3087   /* Remove it from any blocking queues */
3088   unblockThread(tso);
3089
3090   /* The stack freezing code assumes there's a closure pointer on
3091    * the top of the stack.  This isn't always the case with compiled
3092    * code, so we have to push a dummy closure on the top which just
3093    * returns to the next return address on the stack.
3094    */
3095   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3096     *(--sp) = (W_)&stg_dummy_ret_closure;
3097   }
3098
3099   while (1) {
3100     nat words = ((P_)su - (P_)sp) - 1;
3101     nat i;
3102     StgAP_UPD * ap;
3103
3104     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3105      * then build PAP(handler,exception,realworld#), and leave it on
3106      * top of the stack ready to enter.
3107      */
3108     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3109       StgCatchFrame *cf = (StgCatchFrame *)su;
3110       /* we've got an exception to raise, so let's pass it to the
3111        * handler in this frame.
3112        */
3113       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3114       TICK_ALLOC_UPD_PAP(3,0);
3115       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3116               
3117       ap->n_args = 2;
3118       ap->fun = cf->handler;    /* :: Exception -> IO a */
3119       ap->payload[0] = exception;
3120       ap->payload[1] = ARG_TAG(0); /* realworld token */
3121
3122       /* throw away the stack from Sp up to and including the
3123        * CATCH_FRAME.
3124        */
3125       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3126       tso->su = cf->link;
3127
3128       /* Restore the blocked/unblocked state for asynchronous exceptions
3129        * at the CATCH_FRAME.  
3130        *
3131        * If exceptions were unblocked at the catch, arrange that they
3132        * are unblocked again after executing the handler by pushing an
3133        * unblockAsyncExceptions_ret stack frame.
3134        */
3135       if (!cf->exceptions_blocked) {
3136         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3137       }
3138       
3139       /* Ensure that async exceptions are blocked when running the handler.
3140        */
3141       if (tso->blocked_exceptions == NULL) {
3142         tso->blocked_exceptions = END_TSO_QUEUE;
3143       }
3144       
3145       /* Put the newly-built PAP on top of the stack, ready to execute
3146        * when the thread restarts.
3147        */
3148       sp[0] = (W_)ap;
3149       tso->sp = sp;
3150       tso->what_next = ThreadEnterGHC;
3151       IF_DEBUG(sanity, checkTSO(tso));
3152       return;
3153     }
3154
3155     /* First build an AP_UPD consisting of the stack chunk above the
3156      * current update frame, with the top word on the stack as the
3157      * fun field.
3158      */
3159     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3160     
3161     ASSERT(words >= 0);
3162     
3163     ap->n_args = words;
3164     ap->fun    = (StgClosure *)sp[0];
3165     sp++;
3166     for(i=0; i < (nat)words; ++i) {
3167       ap->payload[i] = (StgClosure *)*sp++;
3168     }
3169     
3170     switch (get_itbl(su)->type) {
3171       
3172     case UPDATE_FRAME:
3173       {
3174         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3175         TICK_ALLOC_UP_THK(words+1,0);
3176         
3177         IF_DEBUG(scheduler,
3178                  fprintf(stderr,  "scheduler: Updating ");
3179                  printPtr((P_)su->updatee); 
3180                  fprintf(stderr,  " with ");
3181                  printObj((StgClosure *)ap);
3182                  );
3183         
3184         /* Replace the updatee with an indirection - happily
3185          * this will also wake up any threads currently
3186          * waiting on the result.
3187          *
3188          * Warning: if we're in a loop, more than one update frame on
3189          * the stack may point to the same object.  Be careful not to
3190          * overwrite an IND_OLDGEN in this case, because we'll screw
3191          * up the mutable lists.  To be on the safe side, don't
3192          * overwrite any kind of indirection at all.  See also
3193          * threadSqueezeStack in GC.c, where we have to make a similar
3194          * check.
3195          */
3196         if (!closure_IND(su->updatee)) {
3197             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3198         }
3199         su = su->link;
3200         sp += sizeofW(StgUpdateFrame) -1;
3201         sp[0] = (W_)ap; /* push onto stack */
3202         break;
3203       }
3204
3205     case CATCH_FRAME:
3206       {
3207         StgCatchFrame *cf = (StgCatchFrame *)su;
3208         StgClosure* o;
3209         
3210         /* We want a PAP, not an AP_UPD.  Fortunately, the
3211          * layout's the same.
3212          */
3213         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3214         TICK_ALLOC_UPD_PAP(words+1,0);
3215         
3216         /* now build o = FUN(catch,ap,handler) */
3217         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3218         TICK_ALLOC_FUN(2,0);
3219         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3220         o->payload[0] = (StgClosure *)ap;
3221         o->payload[1] = cf->handler;
3222         
3223         IF_DEBUG(scheduler,
3224                  fprintf(stderr,  "scheduler: Built ");
3225                  printObj((StgClosure *)o);
3226                  );
3227         
3228         /* pop the old handler and put o on the stack */
3229         su = cf->link;
3230         sp += sizeofW(StgCatchFrame) - 1;
3231         sp[0] = (W_)o;
3232         break;
3233       }
3234       
3235     case SEQ_FRAME:
3236       {
3237         StgSeqFrame *sf = (StgSeqFrame *)su;
3238         StgClosure* o;
3239         
3240         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3241         TICK_ALLOC_UPD_PAP(words+1,0);
3242         
3243         /* now build o = FUN(seq,ap) */
3244         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3245         TICK_ALLOC_SE_THK(1,0);
3246         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3247         o->payload[0] = (StgClosure *)ap;
3248         
3249         IF_DEBUG(scheduler,
3250                  fprintf(stderr,  "scheduler: Built ");
3251                  printObj((StgClosure *)o);
3252                  );
3253         
3254         /* pop the old handler and put o on the stack */
3255         su = sf->link;
3256         sp += sizeofW(StgSeqFrame) - 1;
3257         sp[0] = (W_)o;
3258         break;
3259       }
3260       
3261     case STOP_FRAME:
3262       /* We've stripped the entire stack, the thread is now dead. */
3263       sp += sizeofW(StgStopFrame) - 1;
3264       sp[0] = (W_)exception;    /* save the exception */
3265       tso->what_next = ThreadKilled;
3266       tso->su = (StgUpdateFrame *)(sp+1);
3267       tso->sp = sp;
3268       return;
3269
3270     default:
3271       barf("raiseAsync");
3272     }
3273   }
3274   barf("raiseAsync");
3275 }
3276
3277 /* -----------------------------------------------------------------------------
3278    resurrectThreads is called after garbage collection on the list of
3279    threads found to be garbage.  Each of these threads will be woken
3280    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3281    on an MVar, or NonTermination if the thread was blocked on a Black
3282    Hole.
3283    -------------------------------------------------------------------------- */
3284
3285 void
3286 resurrectThreads( StgTSO *threads )
3287 {
3288   StgTSO *tso, *next;
3289
3290   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3291     next = tso->global_link;
3292     tso->global_link = all_threads;
3293     all_threads = tso;
3294     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3295
3296     switch (tso->why_blocked) {
3297     case BlockedOnMVar:
3298     case BlockedOnException:
3299       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3300       break;
3301     case BlockedOnBlackHole:
3302       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3303       break;
3304     case NotBlocked:
3305       /* This might happen if the thread was blocked on a black hole
3306        * belonging to a thread that we've just woken up (raiseAsync
3307        * can wake up threads, remember...).
3308        */
3309       continue;
3310     default:
3311       barf("resurrectThreads: thread blocked in a strange way");
3312     }
3313   }
3314 }
3315
3316 /* -----------------------------------------------------------------------------
3317  * Blackhole detection: if we reach a deadlock, test whether any
3318  * threads are blocked on themselves.  Any threads which are found to
3319  * be self-blocked get sent a NonTermination exception.
3320  *
3321  * This is only done in a deadlock situation in order to avoid
3322  * performance overhead in the normal case.
3323  * -------------------------------------------------------------------------- */
3324
3325 static void
3326 detectBlackHoles( void )
3327 {
3328     StgTSO *t = all_threads;
3329     StgUpdateFrame *frame;
3330     StgClosure *blocked_on;
3331
3332     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3333
3334         while (t->what_next == ThreadRelocated) {
3335             t = t->link;
3336             ASSERT(get_itbl(t)->type == TSO);
3337         }
3338       
3339         if (t->why_blocked != BlockedOnBlackHole) {
3340             continue;
3341         }
3342
3343         blocked_on = t->block_info.closure;
3344
3345         for (frame = t->su; ; frame = frame->link) {
3346             switch (get_itbl(frame)->type) {
3347
3348             case UPDATE_FRAME:
3349                 if (frame->updatee == blocked_on) {
3350                     /* We are blocking on one of our own computations, so
3351                      * send this thread the NonTermination exception.  
3352                      */
3353                     IF_DEBUG(scheduler, 
3354                              sched_belch("thread %d is blocked on itself", t->id));
3355                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3356                     goto done;
3357                 }
3358                 else {
3359                     continue;
3360                 }
3361
3362             case CATCH_FRAME:
3363             case SEQ_FRAME:
3364                 continue;
3365                 
3366             case STOP_FRAME:
3367                 break;
3368             }
3369             break;
3370         }
3371
3372     done: ;
3373     }   
3374 }
3375
3376 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3377 //@subsection Debugging Routines
3378
3379 /* -----------------------------------------------------------------------------
3380    Debugging: why is a thread blocked
3381    -------------------------------------------------------------------------- */
3382
3383 #ifdef DEBUG
3384
3385 void
3386 printThreadBlockage(StgTSO *tso)
3387 {
3388   switch (tso->why_blocked) {
3389   case BlockedOnRead:
3390     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3391     break;
3392   case BlockedOnWrite:
3393     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3394     break;
3395   case BlockedOnDelay:
3396     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3397     break;
3398   case BlockedOnMVar:
3399     fprintf(stderr,"is blocked on an MVar");
3400     break;
3401   case BlockedOnException:
3402     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3403             tso->block_info.tso->id);
3404     break;
3405   case BlockedOnBlackHole:
3406     fprintf(stderr,"is blocked on a black hole");
3407     break;
3408   case NotBlocked:
3409     fprintf(stderr,"is not blocked");
3410     break;
3411 #if defined(PAR)
3412   case BlockedOnGA:
3413     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3414             tso->block_info.closure, info_type(tso->block_info.closure));
3415     break;
3416   case BlockedOnGA_NoSend:
3417     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3418             tso->block_info.closure, info_type(tso->block_info.closure));
3419     break;
3420 #endif
3421   default:
3422     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3423          tso->why_blocked, tso->id, tso);
3424   }
3425 }
3426
3427 void
3428 printThreadStatus(StgTSO *tso)
3429 {
3430   switch (tso->what_next) {
3431   case ThreadKilled:
3432     fprintf(stderr,"has been killed");
3433     break;
3434   case ThreadComplete:
3435     fprintf(stderr,"has completed");
3436     break;
3437   default:
3438     printThreadBlockage(tso);
3439   }
3440 }
3441
3442 void
3443 printAllThreads(void)
3444 {
3445   StgTSO *t;
3446
3447 # if defined(GRAN)
3448   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3449   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3450                        time_string, rtsFalse/*no commas!*/);
3451
3452   sched_belch("all threads at [%s]:", time_string);
3453 # elif defined(PAR)
3454   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3455   ullong_format_string(CURRENT_TIME,
3456                        time_string, rtsFalse/*no commas!*/);
3457
3458   sched_belch("all threads at [%s]:", time_string);
3459 # else
3460   sched_belch("all threads:");
3461 # endif
3462
3463   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3464     fprintf(stderr, "\tthread %d ", t->id);
3465     printThreadStatus(t);
3466     fprintf(stderr,"\n");
3467   }
3468 }
3469     
3470 /* 
3471    Print a whole blocking queue attached to node (debugging only).
3472 */
3473 //@cindex print_bq
3474 # if defined(PAR)
3475 void 
3476 print_bq (StgClosure *node)
3477 {
3478   StgBlockingQueueElement *bqe;
3479   StgTSO *tso;
3480   rtsBool end;
3481
3482   fprintf(stderr,"## BQ of closure %p (%s): ",
3483           node, info_type(node));
3484
3485   /* should cover all closures that may have a blocking queue */
3486   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3487          get_itbl(node)->type == FETCH_ME_BQ ||
3488          get_itbl(node)->type == RBH ||
3489          get_itbl(node)->type == MVAR);
3490     
3491   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3492
3493   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3494 }
3495
3496 /* 
3497    Print a whole blocking queue starting with the element bqe.
3498 */
3499 void 
3500 print_bqe (StgBlockingQueueElement *bqe)
3501 {
3502   rtsBool end;
3503
3504   /* 
3505      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3506   */
3507   for (end = (bqe==END_BQ_QUEUE);
3508        !end; // iterate until bqe points to a CONSTR
3509        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3510        bqe = end ? END_BQ_QUEUE : bqe->link) {
3511     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3512     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3513     /* types of closures that may appear in a blocking queue */
3514     ASSERT(get_itbl(bqe)->type == TSO ||           
3515            get_itbl(bqe)->type == BLOCKED_FETCH || 
3516            get_itbl(bqe)->type == CONSTR); 
3517     /* only BQs of an RBH end with an RBH_Save closure */
3518     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3519
3520     switch (get_itbl(bqe)->type) {
3521     case TSO:
3522       fprintf(stderr," TSO %u (%x),",
3523               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3524       break;
3525     case BLOCKED_FETCH:
3526       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3527               ((StgBlockedFetch *)bqe)->node, 
3528               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3529               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3530               ((StgBlockedFetch *)bqe)->ga.weight);
3531       break;
3532     case CONSTR:
3533       fprintf(stderr," %s (IP %p),",
3534               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3535                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3536                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3537                "RBH_Save_?"), get_itbl(bqe));
3538       break;
3539     default:
3540       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3541            info_type((StgClosure *)bqe)); // , node, info_type(node));
3542       break;
3543     }
3544   } /* for */
3545   fputc('\n', stderr);
3546 }
3547 # elif defined(GRAN)
3548 void 
3549 print_bq (StgClosure *node)
3550 {
3551   StgBlockingQueueElement *bqe;
3552   PEs node_loc, tso_loc;
3553   rtsBool end;
3554
3555   /* should cover all closures that may have a blocking queue */
3556   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3557          get_itbl(node)->type == FETCH_ME_BQ ||
3558          get_itbl(node)->type == RBH);
3559     
3560   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3561   node_loc = where_is(node);
3562
3563   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3564           node, info_type(node), node_loc);
3565
3566   /* 
3567      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3568   */
3569   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3570        !end; // iterate until bqe points to a CONSTR
3571        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3572     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3573     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3574     /* types of closures that may appear in a blocking queue */
3575     ASSERT(get_itbl(bqe)->type == TSO ||           
3576            get_itbl(bqe)->type == CONSTR); 
3577     /* only BQs of an RBH end with an RBH_Save closure */
3578     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3579
3580     tso_loc = where_is((StgClosure *)bqe);
3581     switch (get_itbl(bqe)->type) {
3582     case TSO:
3583       fprintf(stderr," TSO %d (%p) on [PE %d],",
3584               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3585       break;
3586     case CONSTR:
3587       fprintf(stderr," %s (IP %p),",
3588               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3589                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3590                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3591                "RBH_Save_?"), get_itbl(bqe));
3592       break;
3593     default:
3594       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3595            info_type((StgClosure *)bqe), node, info_type(node));
3596       break;
3597     }
3598   } /* for */
3599   fputc('\n', stderr);
3600 }
3601 #else
3602 /* 
3603    Nice and easy: only TSOs on the blocking queue
3604 */
3605 void 
3606 print_bq (StgClosure *node)
3607 {
3608   StgTSO *tso;
3609
3610   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3611   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3612        tso != END_TSO_QUEUE; 
3613        tso=tso->link) {
3614     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3615     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3616     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3617   }
3618   fputc('\n', stderr);
3619 }
3620 # endif
3621
3622 #if defined(PAR)
3623 static nat
3624 run_queue_len(void)
3625 {
3626   nat i;
3627   StgTSO *tso;
3628
3629   for (i=0, tso=run_queue_hd; 
3630        tso != END_TSO_QUEUE;
3631        i++, tso=tso->link)
3632     /* nothing */
3633
3634   return i;
3635 }
3636 #endif
3637
3638 static void
3639 sched_belch(char *s, ...)
3640 {
3641   va_list ap;
3642   va_start(ap,s);
3643 #ifdef SMP
3644   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3645 #elif defined(PAR)
3646   fprintf(stderr, "== ");
3647 #else
3648   fprintf(stderr, "scheduler: ");
3649 #endif
3650   vfprintf(stderr, s, ap);
3651   fprintf(stderr, "\n");
3652 }
3653
3654 #endif /* DEBUG */
3655
3656
3657 //@node Index,  , Debugging Routines, Main scheduling code
3658 //@subsection Index
3659
3660 //@index
3661 //* MainRegTable::  @cindex\s-+MainRegTable
3662 //* StgMainThread::  @cindex\s-+StgMainThread
3663 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3664 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3665 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3666 //* context_switch::  @cindex\s-+context_switch
3667 //* createThread::  @cindex\s-+createThread
3668 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3669 //* initScheduler::  @cindex\s-+initScheduler
3670 //* interrupted::  @cindex\s-+interrupted
3671 //* next_thread_id::  @cindex\s-+next_thread_id
3672 //* print_bq::  @cindex\s-+print_bq
3673 //* run_queue_hd::  @cindex\s-+run_queue_hd
3674 //* run_queue_tl::  @cindex\s-+run_queue_tl
3675 //* sched_mutex::  @cindex\s-+sched_mutex
3676 //* schedule::  @cindex\s-+schedule
3677 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3678 //* term_mutex::  @cindex\s-+term_mutex
3679 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3680 //@end index