[project @ 2002-02-07 06:33:20 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.119 2002/02/07 06:33:20 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #include <stdarg.h>
118
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
121
122 /* Main threads:
123  *
124  * These are the threads which clients have requested that we run.  
125  *
126  * In a 'threaded' build, we might have several concurrent clients all
127  * waiting for results, and each one will wait on a condition variable
128  * until the result is available.
129  *
130  * In non-SMP, clients are strictly nested: the first client calls
131  * into the RTS, which might call out again to C with a _ccall_GC, and
132  * eventually re-enter the RTS.
133  *
134  * Main threads information is kept in a linked list:
135  */
136 //@cindex StgMainThread
137 typedef struct StgMainThread_ {
138   StgTSO *         tso;
139   SchedulerStatus  stat;
140   StgClosure **    ret;
141 #if defined(RTS_SUPPORTS_THREADS)
142   Condition        wakeup;
143 #endif
144   struct StgMainThread_ *link;
145 } StgMainThread;
146
147 /* Main thread queue.
148  * Locks required: sched_mutex.
149  */
150 static StgMainThread *main_threads;
151
152 /* Thread queues.
153  * Locks required: sched_mutex.
154  */
155 #if defined(GRAN)
156
157 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
159
160 /* 
161    In GranSim we have a runable and a blocked queue for each processor.
162    In order to minimise code changes new arrays run_queue_hds/tls
163    are created. run_queue_hd is then a short cut (macro) for
164    run_queue_hds[CurrentProc] (see GranSim.h).
165    -- HWL
166 */
167 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
168 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
169 StgTSO *ccalling_threadss[MAX_PROC];
170 /* We use the same global list of threads (all_threads) in GranSim as in
171    the std RTS (i.e. we are cheating). However, we don't use this list in
172    the GranSim specific code at the moment (so we are only potentially
173    cheating).  */
174
175 #else /* !GRAN */
176
177 StgTSO *run_queue_hd, *run_queue_tl;
178 StgTSO *blocked_queue_hd, *blocked_queue_tl;
179 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
180
181 #endif
182
183 /* Linked list of all threads.
184  * Used for detecting garbage collected threads.
185  */
186 StgTSO *all_threads;
187
188 /* When a thread performs a safe C call (_ccall_GC, using old
189  * terminology), it gets put on the suspended_ccalling_threads
190  * list. Used by the garbage collector.
191  */
192 static StgTSO *suspended_ccalling_threads;
193
194 static StgTSO *threadStackOverflow(StgTSO *tso);
195
196 /* KH: The following two flags are shared memory locations.  There is no need
197        to lock them, since they are only unset at the end of a scheduler
198        operation.
199 */
200
201 /* flag set by signal handler to precipitate a context switch */
202 //@cindex context_switch
203 nat context_switch;
204
205 /* if this flag is set as well, give up execution */
206 //@cindex interrupted
207 rtsBool interrupted;
208
209 /* Next thread ID to allocate.
210  * Locks required: sched_mutex
211  */
212 //@cindex next_thread_id
213 StgThreadID next_thread_id = 1;
214
215 /*
216  * Pointers to the state of the current thread.
217  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
218  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
219  */
220  
221 /* The smallest stack size that makes any sense is:
222  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
223  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
224  *  + 1                       (the realworld token for an IO thread)
225  *  + 1                       (the closure to enter)
226  *
227  * A thread with this stack will bomb immediately with a stack
228  * overflow, which will increase its stack size.  
229  */
230
231 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
232
233
234 #if defined(GRAN)
235 StgTSO *CurrentTSO;
236 #endif
237
238 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
239  *  exists - earlier gccs apparently didn't.
240  *  -= chak
241  */
242 StgTSO dummy_tso;
243
244 rtsBool ready_to_gc;
245
246 void            addToBlockedQueue ( StgTSO *tso );
247
248 static void     schedule          ( void );
249        void     interruptStgRts   ( void );
250 #if defined(GRAN)
251 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
252 #else
253 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
254 #endif
255
256 static void     detectBlackHoles  ( void );
257
258 #ifdef DEBUG
259 static void sched_belch(char *s, ...);
260 #endif
261
262 #if defined(RTS_SUPPORTS_THREADS)
263 /* ToDo: carefully document the invariants that go together
264  *       with these synchronisation objects.
265  */
266 Mutex     sched_mutex       = INIT_MUTEX_VAR;
267 Mutex     term_mutex        = INIT_MUTEX_VAR;
268 #if defined(THREADED_RTS)
269 /*
270  * The rts_mutex is the 'big lock' that the active native
271  * thread within the RTS holds while executing code.
272  * It is given up when the thread makes a transition out of
273  * the RTS (e.g., to perform an external C call), hopefully
274  * for another thread to take over its chores and enter
275  * the RTS.
276  *
277  */
278 Mutex     rts_mutex         = INIT_MUTEX_VAR;
279 /*
280  * When a native thread has completed executing an external
281  * call, it needs to communicate the result back to the
282  * (Haskell) thread that made the call. Do this as follows:
283  *
284  *  - in resumeThread(), the thread increments the counter
285  *    threads_waiting, and then blocks on the 'big' RTS lock.
286  *  - upon entry to the scheduler, the thread that's currently
287  *    holding the RTS lock checks threads_waiting. If there
288  *    are native threads waiting, it gives up its RTS lock
289  *    and tries to re-grab the RTS lock [perhaps after having
290  *    waited for a bit..?]
291  *  - care must be taken to deal with the case where more than
292  *    one external thread are waiting on the lock. [ToDo: more]
293  *    
294  */
295
296 static nat threads_waiting = 0;
297 /*
298  * thread_ready_aux_mutex is used to handle the scenario where the
299  * the RTS executing thread runs out of work, but there are
300  * active external threads. The RTS executing thread gives up
301  * its RTS mutex, and blocks waiting for the thread_ready_cond.
302  * Unfortunately, a condition variable needs to be associated
303  * with a mutex in pthreads, so rts_thread_waiting_mutex is
304  * used for just this purpose.
305  * 
306  */
307 Mutex  thread_ready_aux_mutex = INIT_MUTEX_VAR;
308 #endif
309
310
311 /* thread_ready_cond: when signalled, a thread has
312  * become runnable. When used?
313  */
314 Condition thread_ready_cond = INIT_COND_VAR;
315 Condition gc_pending_cond   = INIT_COND_VAR;
316
317 nat await_death;
318 #endif
319
320 #if defined(PAR)
321 StgTSO *LastTSO;
322 rtsTime TimeOfLastYield;
323 rtsBool emitSchedule = rtsTrue;
324 #endif
325
326 #if DEBUG
327 char *whatNext_strs[] = {
328   "ThreadEnterGHC",
329   "ThreadRunGHC",
330   "ThreadEnterInterp",
331   "ThreadKilled",
332   "ThreadComplete"
333 };
334
335 char *threadReturnCode_strs[] = {
336   "HeapOverflow",                       /* might also be StackOverflow */
337   "StackOverflow",
338   "ThreadYielding",
339   "ThreadBlocked",
340   "ThreadFinished"
341 };
342 #endif
343
344 #if defined(PAR)
345 StgTSO * createSparkThread(rtsSpark spark);
346 StgTSO * activateSpark (rtsSpark spark);  
347 #endif
348
349 /*
350  * The thread state for the main thread.
351 // ToDo: check whether not needed any more
352 StgTSO   *MainTSO;
353  */
354
355 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
356 static void taskStart(void);
357 static void
358 taskStart(void)
359 {
360   /* threads start up using 'taskStart', so make them
361      them grab the RTS lock. */
362 #if defined(THREADED_RTS)
363   ACQUIRE_LOCK(&rts_mutex);
364   taskNotAvailable();
365 #endif
366   schedule();
367 }
368 #endif
369
370
371
372
373 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
374 //@subsection Main scheduling loop
375
376 /* ---------------------------------------------------------------------------
377    Main scheduling loop.
378
379    We use round-robin scheduling, each thread returning to the
380    scheduler loop when one of these conditions is detected:
381
382       * out of heap space
383       * timer expires (thread yields)
384       * thread blocks
385       * thread ends
386       * stack overflow
387
388    Locking notes:  we acquire the scheduler lock once at the beginning
389    of the scheduler loop, and release it when
390     
391       * running a thread, or
392       * waiting for work, or
393       * waiting for a GC to complete.
394
395    GRAN version:
396      In a GranSim setup this loop iterates over the global event queue.
397      This revolves around the global event queue, which determines what 
398      to do next. Therefore, it's more complicated than either the 
399      concurrent or the parallel (GUM) setup.
400
401    GUM version:
402      GUM iterates over incoming messages.
403      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
404      and sends out a fish whenever it has nothing to do; in-between
405      doing the actual reductions (shared code below) it processes the
406      incoming messages and deals with delayed operations 
407      (see PendingFetches).
408      This is not the ugliest code you could imagine, but it's bloody close.
409
410    ------------------------------------------------------------------------ */
411 //@cindex schedule
412 static void
413 schedule( void )
414 {
415   StgTSO *t;
416   Capability *cap;
417   StgThreadReturnCode ret;
418 #if defined(GRAN)
419   rtsEvent *event;
420 #elif defined(PAR)
421   StgSparkPool *pool;
422   rtsSpark spark;
423   StgTSO *tso;
424   GlobalTaskId pe;
425   rtsBool receivedFinish = rtsFalse;
426 # if defined(DEBUG)
427   nat tp_size, sp_size; // stats only
428 # endif
429 #endif
430   rtsBool was_interrupted = rtsFalse;
431   
432   ACQUIRE_LOCK(&sched_mutex);
433   
434 #if defined(THREADED_RTS)
435   /* ToDo: consider SMP support */
436   if (threads_waiting > 0) {
437     /* (At least) one native thread is waiting to
438      * deposit the result of an external call. So,
439      * give up our RTS executing privileges and let
440      * one of them continue.
441      * 
442      */
443     taskAvailable();
444     RELEASE_LOCK(&sched_mutex);
445     IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (threads_waiting=%d)\n", osThreadId(), threads_waiting));
446     RELEASE_LOCK(&rts_mutex);
447     /* ToDo: come up with mechanism that guarantees that
448      * the main thread doesn't loop here.
449      */
450     yieldThread();
451     /* ToDo: longjmp() */
452     taskStart();
453   }
454 #endif
455
456 #if defined(GRAN)
457
458   /* set up first event to get things going */
459   /* ToDo: assign costs for system setup and init MainTSO ! */
460   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
461             ContinueThread, 
462             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
463
464   IF_DEBUG(gran,
465            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
466            G_TSO(CurrentTSO, 5));
467
468   if (RtsFlags.GranFlags.Light) {
469     /* Save current time; GranSim Light only */
470     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
471   }      
472
473   event = get_next_event();
474
475   while (event!=(rtsEvent*)NULL) {
476     /* Choose the processor with the next event */
477     CurrentProc = event->proc;
478     CurrentTSO = event->tso;
479
480 #elif defined(PAR)
481
482   while (!receivedFinish) {    /* set by processMessages */
483                                /* when receiving PP_FINISH message         */ 
484 #else
485
486   while (1) {
487
488 #endif
489
490     IF_DEBUG(scheduler, printAllThreads());
491
492     /* If we're interrupted (the user pressed ^C, or some other
493      * termination condition occurred), kill all the currently running
494      * threads.
495      */
496     if (interrupted) {
497       IF_DEBUG(scheduler, sched_belch("interrupted"));
498       deleteAllThreads();
499       interrupted = rtsFalse;
500       was_interrupted = rtsTrue;
501     }
502
503     /* Go through the list of main threads and wake up any
504      * clients whose computations have finished.  ToDo: this
505      * should be done more efficiently without a linear scan
506      * of the main threads list, somehow...
507      */
508 #if defined(RTS_SUPPORTS_THREADS)
509     { 
510       StgMainThread *m, **prev;
511       prev = &main_threads;
512       for (m = main_threads; m != NULL; m = m->link) {
513         switch (m->tso->what_next) {
514         case ThreadComplete:
515           if (m->ret) {
516             *(m->ret) = (StgClosure *)m->tso->sp[0];
517           }
518           *prev = m->link;
519           m->stat = Success;
520           broadcastCondition(&m->wakeup);
521           break;
522         case ThreadKilled:
523           if (m->ret) *(m->ret) = NULL;
524           *prev = m->link;
525           if (was_interrupted) {
526             m->stat = Interrupted;
527           } else {
528             m->stat = Killed;
529           }
530           broadcastCondition(&m->wakeup);
531           break;
532         default:
533           break;
534         }
535       }
536     }
537
538 #else /* not threaded */
539
540 # if defined(PAR)
541     /* in GUM do this only on the Main PE */
542     if (IAmMainThread)
543 # endif
544     /* If our main thread has finished or been killed, return.
545      */
546     {
547       StgMainThread *m = main_threads;
548       if (m->tso->what_next == ThreadComplete
549           || m->tso->what_next == ThreadKilled) {
550         main_threads = main_threads->link;
551         if (m->tso->what_next == ThreadComplete) {
552           /* we finished successfully, fill in the return value */
553           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
554           m->stat = Success;
555           return;
556         } else {
557           if (m->ret) { *(m->ret) = NULL; };
558           if (was_interrupted) {
559             m->stat = Interrupted;
560           } else {
561             m->stat = Killed;
562           }
563           return;
564         }
565       }
566     }
567 #endif
568
569     /* Top up the run queue from our spark pool.  We try to make the
570      * number of threads in the run queue equal to the number of
571      * free capabilities.
572      *
573      * Disable spark support in SMP for now, non-essential & requires
574      * a little bit of work to make it compile cleanly. -- sof 1/02.
575      */
576 #if 0 /* defined(SMP) */
577     {
578       nat n = getFreeCapabilities();
579       StgTSO *tso = run_queue_hd;
580
581       /* Count the run queue */
582       while (n > 0 && tso != END_TSO_QUEUE) {
583         tso = tso->link;
584         n--;
585       }
586
587       for (; n > 0; n--) {
588         StgClosure *spark;
589         spark = findSpark(rtsFalse);
590         if (spark == NULL) {
591           break; /* no more sparks in the pool */
592         } else {
593           /* I'd prefer this to be done in activateSpark -- HWL */
594           /* tricky - it needs to hold the scheduler lock and
595            * not try to re-acquire it -- SDM */
596           createSparkThread(spark);       
597           IF_DEBUG(scheduler,
598                    sched_belch("==^^ turning spark of closure %p into a thread",
599                                (StgClosure *)spark));
600         }
601       }
602       /* We need to wake up the other tasks if we just created some
603        * work for them.
604        */
605       if (getFreeCapabilities() - n > 1) {
606           signalCondition( &thread_ready_cond );
607       }
608     }
609 #endif // SMP
610
611     /* check for signals each time around the scheduler */
612 #ifndef mingw32_TARGET_OS
613     if (signals_pending()) {
614       startSignalHandlers();
615     }
616 #endif
617
618     /* Check whether any waiting threads need to be woken up.  If the
619      * run queue is empty, and there are no other tasks running, we
620      * can wait indefinitely for something to happen.
621      * ToDo: what if another client comes along & requests another
622      * main thread?
623      */
624     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
625       awaitEvent(
626            (run_queue_hd == END_TSO_QUEUE)
627 #if defined(SMP)
628         && allFreeCapabilities()
629 #endif
630         );
631     }
632     /* we can be interrupted while waiting for I/O... */
633     if (interrupted) continue;
634
635     /* 
636      * Detect deadlock: when we have no threads to run, there are no
637      * threads waiting on I/O or sleeping, and all the other tasks are
638      * waiting for work, we must have a deadlock of some description.
639      *
640      * We first try to find threads blocked on themselves (ie. black
641      * holes), and generate NonTermination exceptions where necessary.
642      *
643      * If no threads are black holed, we have a deadlock situation, so
644      * inform all the main threads.
645      */
646 #ifndef PAR
647     if (blocked_queue_hd == END_TSO_QUEUE
648         && run_queue_hd == END_TSO_QUEUE
649         && sleeping_queue == END_TSO_QUEUE
650 #if defined(SMP)
651         && allFreeCapabilities()
652 #elif defined(THREADED_RTS)
653         && suspended_ccalling_threads == END_TSO_QUEUE
654 #endif
655         )
656     {
657         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
658         RELEASE_LOCK(&sched_mutex);
659         GarbageCollect(GetRoots,rtsTrue);
660         ACQUIRE_LOCK(&sched_mutex);
661         IF_DEBUG(scheduler, sched_belch("GC done."));
662         if (blocked_queue_hd == END_TSO_QUEUE
663             && run_queue_hd == END_TSO_QUEUE
664             && sleeping_queue == END_TSO_QUEUE) {
665
666             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
667             detectBlackHoles();
668
669             /* No black holes, so probably a real deadlock.  Send the
670              * current main thread the Deadlock exception (or in the SMP
671              * build, send *all* main threads the deadlock exception,
672              * since none of them can make progress).
673              */
674             if (run_queue_hd == END_TSO_QUEUE) {
675                 StgMainThread *m;
676 #if defined(RTS_SUPPORTS_THREADS)
677                 for (m = main_threads; m != NULL; m = m->link) {
678                     switch (m->tso->why_blocked) {
679                     case BlockedOnBlackHole:
680                         raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
681                         break;
682                     case BlockedOnException:
683                     case BlockedOnMVar:
684                         raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
685                         break;
686                     default:
687                         barf("deadlock: main thread blocked in a strange way");
688                     }
689                 }
690 #else
691                 m = main_threads;
692                 switch (m->tso->why_blocked) {
693                 case BlockedOnBlackHole:
694                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
695                     break;
696                 case BlockedOnException:
697                 case BlockedOnMVar:
698                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
699                     break;
700                 default:
701                     barf("deadlock: main thread blocked in a strange way");
702                 }
703 #endif
704             }
705 #if defined(RTS_SUPPORTS_THREADS)
706             if ( run_queue_hd == END_TSO_QUEUE ) {
707               IF_DEBUG(scheduler, sched_belch("all done, it seems...shut down."));
708               shutdownHaskellAndExit(0);
709             
710             }
711 #endif
712             ASSERT( run_queue_hd != END_TSO_QUEUE );
713         }
714     }
715 #elif defined(PAR)
716     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
717 #endif
718
719 #if defined(SMP)
720     /* If there's a GC pending, don't do anything until it has
721      * completed.
722      */
723     if (ready_to_gc) {
724       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
725       waitCondition( &gc_pending_cond, &sched_mutex );
726     }
727 #endif    
728
729 #if defined(SMP)
730     /* block until we've got a thread on the run queue and a free
731      * capability.
732      */
733     while ( run_queue_hd == END_TSO_QUEUE 
734             || noFreeCapabilities() 
735             ) {
736       IF_DEBUG(scheduler, sched_belch("waiting for work"));
737       waitCondition( &thread_ready_cond, &sched_mutex );
738       IF_DEBUG(scheduler, sched_belch("work now available"));
739     }
740 #elif defined(THREADED_RTS)
741    if ( run_queue_hd == END_TSO_QUEUE ) {
742      /* no work available, wait for external calls to complete. */
743      IF_DEBUG(scheduler, sched_belch("worker thread (%d): waiting for external thread to complete..", osThreadId()));
744      taskAvailable();
745      RELEASE_LOCK(&sched_mutex);
746      RELEASE_LOCK(&rts_mutex);
747
748      /* Sigh - need to have a mutex locked in order to wait on the
749         condition variable. */
750      ACQUIRE_LOCK(&thread_ready_aux_mutex);
751      waitCondition(&thread_ready_cond, &thread_ready_aux_mutex);
752      RELEASE_LOCK(&thread_ready_aux_mutex);
753
754      IF_DEBUG(scheduler, sched_belch("worker thread (%d): re-awakened from no-work slumber..\n", osThreadId()));
755      /* ToDo: longjmp() */
756      taskStart();
757    }
758 #endif
759
760 #if defined(GRAN)
761
762     if (RtsFlags.GranFlags.Light)
763       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
764
765     /* adjust time based on time-stamp */
766     if (event->time > CurrentTime[CurrentProc] &&
767         event->evttype != ContinueThread)
768       CurrentTime[CurrentProc] = event->time;
769     
770     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
771     if (!RtsFlags.GranFlags.Light)
772       handleIdlePEs();
773
774     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
775
776     /* main event dispatcher in GranSim */
777     switch (event->evttype) {
778       /* Should just be continuing execution */
779     case ContinueThread:
780       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
781       /* ToDo: check assertion
782       ASSERT(run_queue_hd != (StgTSO*)NULL &&
783              run_queue_hd != END_TSO_QUEUE);
784       */
785       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
786       if (!RtsFlags.GranFlags.DoAsyncFetch &&
787           procStatus[CurrentProc]==Fetching) {
788         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
789               CurrentTSO->id, CurrentTSO, CurrentProc);
790         goto next_thread;
791       } 
792       /* Ignore ContinueThreads for completed threads */
793       if (CurrentTSO->what_next == ThreadComplete) {
794         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
795               CurrentTSO->id, CurrentTSO, CurrentProc);
796         goto next_thread;
797       } 
798       /* Ignore ContinueThreads for threads that are being migrated */
799       if (PROCS(CurrentTSO)==Nowhere) { 
800         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
801               CurrentTSO->id, CurrentTSO, CurrentProc);
802         goto next_thread;
803       }
804       /* The thread should be at the beginning of the run queue */
805       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
806         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
807               CurrentTSO->id, CurrentTSO, CurrentProc);
808         break; // run the thread anyway
809       }
810       /*
811       new_event(proc, proc, CurrentTime[proc],
812                 FindWork,
813                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
814       goto next_thread; 
815       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
816       break; // now actually run the thread; DaH Qu'vam yImuHbej 
817
818     case FetchNode:
819       do_the_fetchnode(event);
820       goto next_thread;             /* handle next event in event queue  */
821       
822     case GlobalBlock:
823       do_the_globalblock(event);
824       goto next_thread;             /* handle next event in event queue  */
825       
826     case FetchReply:
827       do_the_fetchreply(event);
828       goto next_thread;             /* handle next event in event queue  */
829       
830     case UnblockThread:   /* Move from the blocked queue to the tail of */
831       do_the_unblock(event);
832       goto next_thread;             /* handle next event in event queue  */
833       
834     case ResumeThread:  /* Move from the blocked queue to the tail of */
835       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
836       event->tso->gran.blocktime += 
837         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
838       do_the_startthread(event);
839       goto next_thread;             /* handle next event in event queue  */
840       
841     case StartThread:
842       do_the_startthread(event);
843       goto next_thread;             /* handle next event in event queue  */
844       
845     case MoveThread:
846       do_the_movethread(event);
847       goto next_thread;             /* handle next event in event queue  */
848       
849     case MoveSpark:
850       do_the_movespark(event);
851       goto next_thread;             /* handle next event in event queue  */
852       
853     case FindWork:
854       do_the_findwork(event);
855       goto next_thread;             /* handle next event in event queue  */
856       
857     default:
858       barf("Illegal event type %u\n", event->evttype);
859     }  /* switch */
860     
861     /* This point was scheduler_loop in the old RTS */
862
863     IF_DEBUG(gran, belch("GRAN: after main switch"));
864
865     TimeOfLastEvent = CurrentTime[CurrentProc];
866     TimeOfNextEvent = get_time_of_next_event();
867     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
868     // CurrentTSO = ThreadQueueHd;
869
870     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
871                          TimeOfNextEvent));
872
873     if (RtsFlags.GranFlags.Light) 
874       GranSimLight_leave_system(event, &ActiveTSO); 
875
876     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
877
878     IF_DEBUG(gran, 
879              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
880
881     /* in a GranSim setup the TSO stays on the run queue */
882     t = CurrentTSO;
883     /* Take a thread from the run queue. */
884     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
885
886     IF_DEBUG(gran, 
887              fprintf(stderr, "GRAN: About to run current thread, which is\n");
888              G_TSO(t,5));
889
890     context_switch = 0; // turned on via GranYield, checking events and time slice
891
892     IF_DEBUG(gran, 
893              DumpGranEvent(GR_SCHEDULE, t));
894
895     procStatus[CurrentProc] = Busy;
896
897 #elif defined(PAR)
898     if (PendingFetches != END_BF_QUEUE) {
899         processFetches();
900     }
901
902     /* ToDo: phps merge with spark activation above */
903     /* check whether we have local work and send requests if we have none */
904     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
905       /* :-[  no local threads => look out for local sparks */
906       /* the spark pool for the current PE */
907       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
908       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
909           pool->hd < pool->tl) {
910         /* 
911          * ToDo: add GC code check that we really have enough heap afterwards!!
912          * Old comment:
913          * If we're here (no runnable threads) and we have pending
914          * sparks, we must have a space problem.  Get enough space
915          * to turn one of those pending sparks into a
916          * thread... 
917          */
918
919         spark = findSpark(rtsFalse);                /* get a spark */
920         if (spark != (rtsSpark) NULL) {
921           tso = activateSpark(spark);       /* turn the spark into a thread */
922           IF_PAR_DEBUG(schedule,
923                        belch("==== schedule: Created TSO %d (%p); %d threads active",
924                              tso->id, tso, advisory_thread_count));
925
926           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
927             belch("==^^ failed to activate spark");
928             goto next_thread;
929           }               /* otherwise fall through & pick-up new tso */
930         } else {
931           IF_PAR_DEBUG(verbose,
932                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
933                              spark_queue_len(pool)));
934           goto next_thread;
935         }
936       }
937
938       /* If we still have no work we need to send a FISH to get a spark
939          from another PE 
940       */
941       if (EMPTY_RUN_QUEUE()) {
942       /* =8-[  no local sparks => look for work on other PEs */
943         /*
944          * We really have absolutely no work.  Send out a fish
945          * (there may be some out there already), and wait for
946          * something to arrive.  We clearly can't run any threads
947          * until a SCHEDULE or RESUME arrives, and so that's what
948          * we're hoping to see.  (Of course, we still have to
949          * respond to other types of messages.)
950          */
951         TIME now = msTime() /*CURRENT_TIME*/;
952         IF_PAR_DEBUG(verbose, 
953                      belch("--  now=%ld", now));
954         IF_PAR_DEBUG(verbose,
955                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
956                          (last_fish_arrived_at!=0 &&
957                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
958                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
959                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
960                              last_fish_arrived_at,
961                              RtsFlags.ParFlags.fishDelay, now);
962                      });
963         
964         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
965             (last_fish_arrived_at==0 ||
966              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
967           /* outstandingFishes is set in sendFish, processFish;
968              avoid flooding system with fishes via delay */
969           pe = choosePE();
970           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
971                    NEW_FISH_HUNGER);
972
973           // Global statistics: count no. of fishes
974           if (RtsFlags.ParFlags.ParStats.Global &&
975               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
976             globalParStats.tot_fish_mess++;
977           }
978         }
979       
980         receivedFinish = processMessages();
981         goto next_thread;
982       }
983     } else if (PacketsWaiting()) {  /* Look for incoming messages */
984       receivedFinish = processMessages();
985     }
986
987     /* Now we are sure that we have some work available */
988     ASSERT(run_queue_hd != END_TSO_QUEUE);
989
990     /* Take a thread from the run queue, if we have work */
991     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
992     IF_DEBUG(sanity,checkTSO(t));
993
994     /* ToDo: write something to the log-file
995     if (RTSflags.ParFlags.granSimStats && !sameThread)
996         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
997
998     CurrentTSO = t;
999     */
1000     /* the spark pool for the current PE */
1001     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1002
1003     IF_DEBUG(scheduler, 
1004              belch("--=^ %d threads, %d sparks on [%#x]", 
1005                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1006
1007 #if 1
1008     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1009         t && LastTSO && t->id != LastTSO->id && 
1010         LastTSO->why_blocked == NotBlocked && 
1011         LastTSO->what_next != ThreadComplete) {
1012       // if previously scheduled TSO not blocked we have to record the context switch
1013       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1014                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1015     }
1016
1017     if (RtsFlags.ParFlags.ParStats.Full && 
1018         (emitSchedule /* forced emit */ ||
1019         (t && LastTSO && t->id != LastTSO->id))) {
1020       /* 
1021          we are running a different TSO, so write a schedule event to log file
1022          NB: If we use fair scheduling we also have to write  a deschedule 
1023              event for LastTSO; with unfair scheduling we know that the
1024              previous tso has blocked whenever we switch to another tso, so
1025              we don't need it in GUM for now
1026       */
1027       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1028                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1029       emitSchedule = rtsFalse;
1030     }
1031      
1032 #endif
1033 #else /* !GRAN && !PAR */
1034   
1035     /* grab a thread from the run queue
1036      */
1037     ASSERT(run_queue_hd != END_TSO_QUEUE);
1038     t = POP_RUN_QUEUE();
1039     // Sanity check the thread we're about to run.  This can be
1040     // expensive if there is lots of thread switching going on...
1041     IF_DEBUG(sanity,checkTSO(t));
1042 #endif
1043     
1044     grabCapability(&cap);
1045     cap->r.rCurrentTSO = t;
1046     
1047     /* context switches are now initiated by the timer signal, unless
1048      * the user specified "context switch as often as possible", with
1049      * +RTS -C0
1050      */
1051     if (
1052 #ifdef PROFILING
1053         RtsFlags.ProfFlags.profileInterval == 0 ||
1054 #endif
1055         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1056          && (run_queue_hd != END_TSO_QUEUE
1057              || blocked_queue_hd != END_TSO_QUEUE
1058              || sleeping_queue != END_TSO_QUEUE)))
1059         context_switch = 1;
1060     else
1061         context_switch = 0;
1062
1063     RELEASE_LOCK(&sched_mutex);
1064
1065     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1066                               t->id, t, whatNext_strs[t->what_next]));
1067
1068 #ifdef PROFILING
1069     startHeapProfTimer();
1070 #endif
1071
1072     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1073     /* Run the current thread 
1074      */
1075     switch (cap->r.rCurrentTSO->what_next) {
1076     case ThreadKilled:
1077     case ThreadComplete:
1078         /* Thread already finished, return to scheduler. */
1079         ret = ThreadFinished;
1080         break;
1081     case ThreadEnterGHC:
1082         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1083         break;
1084     case ThreadRunGHC:
1085         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1086         break;
1087     case ThreadEnterInterp:
1088         ret = interpretBCO(cap);
1089         break;
1090     default:
1091       barf("schedule: invalid what_next field");
1092     }
1093     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1094     
1095     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1096 #ifdef PROFILING
1097     stopHeapProfTimer();
1098     CCCS = CCS_SYSTEM;
1099 #endif
1100     
1101     ACQUIRE_LOCK(&sched_mutex);
1102
1103 #ifdef SMP
1104     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1105 #elif !defined(GRAN) && !defined(PAR)
1106     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1107 #endif
1108     t = cap->r.rCurrentTSO;
1109     
1110 #if defined(PAR)
1111     /* HACK 675: if the last thread didn't yield, make sure to print a 
1112        SCHEDULE event to the log file when StgRunning the next thread, even
1113        if it is the same one as before */
1114     LastTSO = t; 
1115     TimeOfLastYield = CURRENT_TIME;
1116 #endif
1117
1118     switch (ret) {
1119     case HeapOverflow:
1120 #if defined(GRAN)
1121       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1122       globalGranStats.tot_heapover++;
1123 #elif defined(PAR)
1124       globalParStats.tot_heapover++;
1125 #endif
1126
1127       // did the task ask for a large block?
1128       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1129           // if so, get one and push it on the front of the nursery.
1130           bdescr *bd;
1131           nat blocks;
1132           
1133           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1134
1135           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1136                                    t->id, t,
1137                                    whatNext_strs[t->what_next], blocks));
1138
1139           // don't do this if it would push us over the
1140           // alloc_blocks_lim limit; we'll GC first.
1141           if (alloc_blocks + blocks < alloc_blocks_lim) {
1142
1143               alloc_blocks += blocks;
1144               bd = allocGroup( blocks );
1145
1146               // link the new group into the list
1147               bd->link = cap->r.rCurrentNursery;
1148               bd->u.back = cap->r.rCurrentNursery->u.back;
1149               if (cap->r.rCurrentNursery->u.back != NULL) {
1150                   cap->r.rCurrentNursery->u.back->link = bd;
1151               } else {
1152                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1153                          g0s0->blocks == cap->r.rNursery);
1154                   cap->r.rNursery = g0s0->blocks = bd;
1155               }           
1156               cap->r.rCurrentNursery->u.back = bd;
1157
1158               // initialise it as a nursery block
1159               bd->step = g0s0;
1160               bd->gen_no = 0;
1161               bd->flags = 0;
1162               bd->free = bd->start;
1163
1164               // don't forget to update the block count in g0s0.
1165               g0s0->n_blocks += blocks;
1166               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1167
1168               // now update the nursery to point to the new block
1169               cap->r.rCurrentNursery = bd;
1170
1171               // we might be unlucky and have another thread get on the
1172               // run queue before us and steal the large block, but in that
1173               // case the thread will just end up requesting another large
1174               // block.
1175               PUSH_ON_RUN_QUEUE(t);
1176               break;
1177           }
1178       }
1179
1180       /* make all the running tasks block on a condition variable,
1181        * maybe set context_switch and wait till they all pile in,
1182        * then have them wait on a GC condition variable.
1183        */
1184       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1185                                t->id, t, whatNext_strs[t->what_next]));
1186       threadPaused(t);
1187 #if defined(GRAN)
1188       ASSERT(!is_on_queue(t,CurrentProc));
1189 #elif defined(PAR)
1190       /* Currently we emit a DESCHEDULE event before GC in GUM.
1191          ToDo: either add separate event to distinguish SYSTEM time from rest
1192                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1193       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1194         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1195                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1196         emitSchedule = rtsTrue;
1197       }
1198 #endif
1199       
1200       ready_to_gc = rtsTrue;
1201       context_switch = 1;               /* stop other threads ASAP */
1202       PUSH_ON_RUN_QUEUE(t);
1203       /* actual GC is done at the end of the while loop */
1204       break;
1205       
1206     case StackOverflow:
1207 #if defined(GRAN)
1208       IF_DEBUG(gran, 
1209                DumpGranEvent(GR_DESCHEDULE, t));
1210       globalGranStats.tot_stackover++;
1211 #elif defined(PAR)
1212       // IF_DEBUG(par, 
1213       // DumpGranEvent(GR_DESCHEDULE, t);
1214       globalParStats.tot_stackover++;
1215 #endif
1216       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1217                                t->id, t, whatNext_strs[t->what_next]));
1218       /* just adjust the stack for this thread, then pop it back
1219        * on the run queue.
1220        */
1221       threadPaused(t);
1222       { 
1223         StgMainThread *m;
1224         /* enlarge the stack */
1225         StgTSO *new_t = threadStackOverflow(t);
1226         
1227         /* This TSO has moved, so update any pointers to it from the
1228          * main thread stack.  It better not be on any other queues...
1229          * (it shouldn't be).
1230          */
1231         for (m = main_threads; m != NULL; m = m->link) {
1232           if (m->tso == t) {
1233             m->tso = new_t;
1234           }
1235         }
1236         threadPaused(new_t);
1237         PUSH_ON_RUN_QUEUE(new_t);
1238       }
1239       break;
1240
1241     case ThreadYielding:
1242 #if defined(GRAN)
1243       IF_DEBUG(gran, 
1244                DumpGranEvent(GR_DESCHEDULE, t));
1245       globalGranStats.tot_yields++;
1246 #elif defined(PAR)
1247       // IF_DEBUG(par, 
1248       // DumpGranEvent(GR_DESCHEDULE, t);
1249       globalParStats.tot_yields++;
1250 #endif
1251       /* put the thread back on the run queue.  Then, if we're ready to
1252        * GC, check whether this is the last task to stop.  If so, wake
1253        * up the GC thread.  getThread will block during a GC until the
1254        * GC is finished.
1255        */
1256       IF_DEBUG(scheduler,
1257                if (t->what_next == ThreadEnterInterp) {
1258                    /* ToDo: or maybe a timer expired when we were in Hugs?
1259                     * or maybe someone hit ctrl-C
1260                     */
1261                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1262                          t->id, t, whatNext_strs[t->what_next]);
1263                } else {
1264                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1265                          t->id, t, whatNext_strs[t->what_next]);
1266                }
1267                );
1268
1269       threadPaused(t);
1270
1271       IF_DEBUG(sanity,
1272                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1273                checkTSO(t));
1274       ASSERT(t->link == END_TSO_QUEUE);
1275 #if defined(GRAN)
1276       ASSERT(!is_on_queue(t,CurrentProc));
1277
1278       IF_DEBUG(sanity,
1279                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1280                checkThreadQsSanity(rtsTrue));
1281 #endif
1282 #if defined(PAR)
1283       if (RtsFlags.ParFlags.doFairScheduling) { 
1284         /* this does round-robin scheduling; good for concurrency */
1285         APPEND_TO_RUN_QUEUE(t);
1286       } else {
1287         /* this does unfair scheduling; good for parallelism */
1288         PUSH_ON_RUN_QUEUE(t);
1289       }
1290 #else
1291       /* this does round-robin scheduling; good for concurrency */
1292       APPEND_TO_RUN_QUEUE(t);
1293 #endif
1294 #if defined(GRAN)
1295       /* add a ContinueThread event to actually process the thread */
1296       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1297                 ContinueThread,
1298                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1299       IF_GRAN_DEBUG(bq, 
1300                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1301                G_EVENTQ(0);
1302                G_CURR_THREADQ(0));
1303 #endif /* GRAN */
1304       break;
1305       
1306     case ThreadBlocked:
1307 #if defined(GRAN)
1308       IF_DEBUG(scheduler,
1309                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1310                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1311                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1312
1313       // ??? needed; should emit block before
1314       IF_DEBUG(gran, 
1315                DumpGranEvent(GR_DESCHEDULE, t)); 
1316       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1317       /*
1318         ngoq Dogh!
1319       ASSERT(procStatus[CurrentProc]==Busy || 
1320               ((procStatus[CurrentProc]==Fetching) && 
1321               (t->block_info.closure!=(StgClosure*)NULL)));
1322       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1323           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1324             procStatus[CurrentProc]==Fetching)) 
1325         procStatus[CurrentProc] = Idle;
1326       */
1327 #elif defined(PAR)
1328       IF_DEBUG(scheduler,
1329                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1330                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1331       IF_PAR_DEBUG(bq,
1332
1333                    if (t->block_info.closure!=(StgClosure*)NULL) 
1334                      print_bq(t->block_info.closure));
1335
1336       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1337       blockThread(t);
1338
1339       /* whatever we schedule next, we must log that schedule */
1340       emitSchedule = rtsTrue;
1341
1342 #else /* !GRAN */
1343       /* don't need to do anything.  Either the thread is blocked on
1344        * I/O, in which case we'll have called addToBlockedQueue
1345        * previously, or it's blocked on an MVar or Blackhole, in which
1346        * case it'll be on the relevant queue already.
1347        */
1348       IF_DEBUG(scheduler,
1349                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1350                printThreadBlockage(t);
1351                fprintf(stderr, "\n"));
1352
1353       /* Only for dumping event to log file 
1354          ToDo: do I need this in GranSim, too?
1355       blockThread(t);
1356       */
1357 #endif
1358       threadPaused(t);
1359       break;
1360       
1361     case ThreadFinished:
1362       /* Need to check whether this was a main thread, and if so, signal
1363        * the task that started it with the return value.  If we have no
1364        * more main threads, we probably need to stop all the tasks until
1365        * we get a new one.
1366        */
1367       /* We also end up here if the thread kills itself with an
1368        * uncaught exception, see Exception.hc.
1369        */
1370       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1371 #if defined(GRAN)
1372       endThread(t, CurrentProc); // clean-up the thread
1373 #elif defined(PAR)
1374       /* For now all are advisory -- HWL */
1375       //if(t->priority==AdvisoryPriority) ??
1376       advisory_thread_count--;
1377       
1378 # ifdef DIST
1379       if(t->dist.priority==RevalPriority)
1380         FinishReval(t);
1381 # endif
1382       
1383       if (RtsFlags.ParFlags.ParStats.Full &&
1384           !RtsFlags.ParFlags.ParStats.Suppressed) 
1385         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1386 #endif
1387       break;
1388       
1389     default:
1390       barf("schedule: invalid thread return code %d", (int)ret);
1391     }
1392     
1393 #ifdef SMP
1394     grabCapability(&cap);
1395 #endif
1396
1397 #ifdef PROFILING
1398     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1399         GarbageCollect(GetRoots, rtsTrue);
1400         heapCensus();
1401         performHeapProfile = rtsFalse;
1402         ready_to_gc = rtsFalse; // we already GC'd
1403     }
1404 #endif
1405
1406 #ifdef SMP
1407     if (ready_to_gc && allFreeCapabilities() )
1408 #else
1409     if (ready_to_gc) 
1410 #endif
1411       {
1412       /* everybody back, start the GC.
1413        * Could do it in this thread, or signal a condition var
1414        * to do it in another thread.  Either way, we need to
1415        * broadcast on gc_pending_cond afterward.
1416        */
1417 #if defined(RTS_SUPPORTS_THREADS)
1418       IF_DEBUG(scheduler,sched_belch("doing GC"));
1419 #endif
1420       GarbageCollect(GetRoots,rtsFalse);
1421       ready_to_gc = rtsFalse;
1422 #ifdef SMP
1423       broadcastCondition(&gc_pending_cond);
1424 #endif
1425 #if defined(GRAN)
1426       /* add a ContinueThread event to continue execution of current thread */
1427       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1428                 ContinueThread,
1429                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1430       IF_GRAN_DEBUG(bq, 
1431                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1432                G_EVENTQ(0);
1433                G_CURR_THREADQ(0));
1434 #endif /* GRAN */
1435     }
1436
1437 #if defined(GRAN)
1438   next_thread:
1439     IF_GRAN_DEBUG(unused,
1440                   print_eventq(EventHd));
1441
1442     event = get_next_event();
1443 #elif defined(PAR)
1444   next_thread:
1445     /* ToDo: wait for next message to arrive rather than busy wait */
1446 #endif /* GRAN */
1447
1448   } /* end of while(1) */
1449
1450   IF_PAR_DEBUG(verbose,
1451                belch("== Leaving schedule() after having received Finish"));
1452 }
1453
1454 /* ---------------------------------------------------------------------------
1455  * deleteAllThreads():  kill all the live threads.
1456  *
1457  * This is used when we catch a user interrupt (^C), before performing
1458  * any necessary cleanups and running finalizers.
1459  * ------------------------------------------------------------------------- */
1460    
1461 void deleteAllThreads ( void )
1462 {
1463   StgTSO* t, *next;
1464   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1465   for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) {
1466       next = t->link;
1467       deleteThread(t);
1468   }
1469   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) {
1470       next = t->link;
1471       deleteThread(t);
1472   }
1473   for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) {
1474       next = t->link;
1475       deleteThread(t);
1476   }
1477   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1478   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1479   sleeping_queue = END_TSO_QUEUE;
1480 }
1481
1482 /* startThread and  insertThread are now in GranSim.c -- HWL */
1483
1484
1485 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1486 //@subsection Suspend and Resume
1487
1488 /* ---------------------------------------------------------------------------
1489  * Suspending & resuming Haskell threads.
1490  * 
1491  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1492  * its capability before calling the C function.  This allows another
1493  * task to pick up the capability and carry on running Haskell
1494  * threads.  It also means that if the C call blocks, it won't lock
1495  * the whole system.
1496  *
1497  * The Haskell thread making the C call is put to sleep for the
1498  * duration of the call, on the susepended_ccalling_threads queue.  We
1499  * give out a token to the task, which it can use to resume the thread
1500  * on return from the C function.
1501  * ------------------------------------------------------------------------- */
1502    
1503 StgInt
1504 suspendThread( StgRegTable *reg )
1505 {
1506   nat tok;
1507   Capability *cap;
1508
1509   /* assume that *reg is a pointer to the StgRegTable part
1510    * of a Capability.
1511    */
1512   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1513
1514   ACQUIRE_LOCK(&sched_mutex);
1515
1516   IF_DEBUG(scheduler,
1517            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1518
1519   threadPaused(cap->r.rCurrentTSO);
1520   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1521   suspended_ccalling_threads = cap->r.rCurrentTSO;
1522
1523   /* Use the thread ID as the token; it should be unique */
1524   tok = cap->r.rCurrentTSO->id;
1525
1526   /* Hand back capability */
1527   releaseCapability(&cap);
1528   
1529 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
1530   /* Preparing to leave the RTS, so ensure there's a native thread/task
1531      waiting to take over.
1532      
1533      ToDo: optimise this and only create a new task if there's a need
1534      for one (i.e., if there's only one Concurrent Haskell thread alive,
1535      there's no need to create a new task).
1536   */
1537   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok));
1538   startTask(taskStart);
1539
1540 #endif
1541   
1542   RELEASE_LOCK(&sched_mutex);
1543   RELEASE_LOCK(&rts_mutex);
1544   return tok; 
1545 }
1546
1547 StgRegTable *
1548 resumeThread( StgInt tok )
1549 {
1550   StgTSO *tso, **prev;
1551   Capability *cap;
1552
1553 #if defined(THREADED_RTS)
1554   IF_DEBUG(scheduler, sched_belch("thread %d returning, waiting for sched. lock.\n", tok));
1555   ACQUIRE_LOCK(&sched_mutex);
1556   threads_waiting++;
1557   IF_DEBUG(scheduler, sched_belch("thread %d returning, threads waiting: %d.\n", tok, threads_waiting));
1558   RELEASE_LOCK(&sched_mutex);
1559   
1560   IF_DEBUG(scheduler, sched_belch("thread %d waiting for RTS lock...\n", tok));
1561   ACQUIRE_LOCK(&rts_mutex);
1562   threads_waiting--;
1563   taskNotAvailable();
1564   IF_DEBUG(scheduler, sched_belch("thread %d acquired RTS lock...\n", tok));
1565 #endif
1566
1567 #if defined(THREADED_RTS)
1568   /* Free up any RTS-blocked threads. */
1569   broadcastCondition(&thread_ready_cond);
1570 #endif
1571
1572   /* Remove the thread off of the suspended list */
1573   prev = &suspended_ccalling_threads;
1574   for (tso = suspended_ccalling_threads; 
1575        tso != END_TSO_QUEUE; 
1576        prev = &tso->link, tso = tso->link) {
1577     if (tso->id == (StgThreadID)tok) {
1578       *prev = tso->link;
1579       break;
1580     }
1581   }
1582   if (tso == END_TSO_QUEUE) {
1583     barf("resumeThread: thread not found");
1584   }
1585   tso->link = END_TSO_QUEUE;
1586
1587 #if defined(SMP)
1588   while ( noFreeCapabilities() ) {
1589     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1590     waitCondition(&thread_ready_cond, &sched_mutex);
1591     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1592   }
1593 #endif
1594
1595   grabCapability(&cap);
1596
1597   cap->r.rCurrentTSO = tso;
1598
1599   return &cap->r;
1600 }
1601
1602
1603 /* ---------------------------------------------------------------------------
1604  * Static functions
1605  * ------------------------------------------------------------------------ */
1606 static void unblockThread(StgTSO *tso);
1607
1608 /* ---------------------------------------------------------------------------
1609  * Comparing Thread ids.
1610  *
1611  * This is used from STG land in the implementation of the
1612  * instances of Eq/Ord for ThreadIds.
1613  * ------------------------------------------------------------------------ */
1614
1615 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1616
1617   StgThreadID id1 = tso1->id; 
1618   StgThreadID id2 = tso2->id;
1619  
1620   if (id1 < id2) return (-1);
1621   if (id1 > id2) return 1;
1622   return 0;
1623 }
1624
1625 /* ---------------------------------------------------------------------------
1626  * Fetching the ThreadID from an StgTSO.
1627  *
1628  * This is used in the implementation of Show for ThreadIds.
1629  * ------------------------------------------------------------------------ */
1630 int rts_getThreadId(const StgTSO *tso) 
1631 {
1632   return tso->id;
1633 }
1634
1635 /* ---------------------------------------------------------------------------
1636    Create a new thread.
1637
1638    The new thread starts with the given stack size.  Before the
1639    scheduler can run, however, this thread needs to have a closure
1640    (and possibly some arguments) pushed on its stack.  See
1641    pushClosure() in Schedule.h.
1642
1643    createGenThread() and createIOThread() (in SchedAPI.h) are
1644    convenient packaged versions of this function.
1645
1646    currently pri (priority) is only used in a GRAN setup -- HWL
1647    ------------------------------------------------------------------------ */
1648 //@cindex createThread
1649 #if defined(GRAN)
1650 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1651 StgTSO *
1652 createThread(nat stack_size, StgInt pri)
1653 {
1654   return createThread_(stack_size, rtsFalse, pri);
1655 }
1656
1657 static StgTSO *
1658 createThread_(nat size, rtsBool have_lock, StgInt pri)
1659 {
1660 #else
1661 StgTSO *
1662 createThread(nat stack_size)
1663 {
1664   return createThread_(stack_size, rtsFalse);
1665 }
1666
1667 static StgTSO *
1668 createThread_(nat size, rtsBool have_lock)
1669 {
1670 #endif
1671
1672     StgTSO *tso;
1673     nat stack_size;
1674
1675     /* First check whether we should create a thread at all */
1676 #if defined(PAR)
1677   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1678   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1679     threadsIgnored++;
1680     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1681           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1682     return END_TSO_QUEUE;
1683   }
1684   threadsCreated++;
1685 #endif
1686
1687 #if defined(GRAN)
1688   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1689 #endif
1690
1691   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1692
1693   /* catch ridiculously small stack sizes */
1694   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1695     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1696   }
1697
1698   stack_size = size - TSO_STRUCT_SIZEW;
1699
1700   tso = (StgTSO *)allocate(size);
1701   TICK_ALLOC_TSO(stack_size, 0);
1702
1703   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1704 #if defined(GRAN)
1705   SET_GRAN_HDR(tso, ThisPE);
1706 #endif
1707   tso->what_next     = ThreadEnterGHC;
1708
1709   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1710    * protect the increment operation on next_thread_id.
1711    * In future, we could use an atomic increment instead.
1712    */
1713   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1714   tso->id = next_thread_id++; 
1715   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1716
1717   tso->why_blocked  = NotBlocked;
1718   tso->blocked_exceptions = NULL;
1719
1720   tso->stack_size   = stack_size;
1721   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1722                               - TSO_STRUCT_SIZEW;
1723   tso->sp           = (P_)&(tso->stack) + stack_size;
1724
1725 #ifdef PROFILING
1726   tso->prof.CCCS = CCS_MAIN;
1727 #endif
1728
1729   /* put a stop frame on the stack */
1730   tso->sp -= sizeofW(StgStopFrame);
1731   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1732   tso->su = (StgUpdateFrame*)tso->sp;
1733
1734   // ToDo: check this
1735 #if defined(GRAN)
1736   tso->link = END_TSO_QUEUE;
1737   /* uses more flexible routine in GranSim */
1738   insertThread(tso, CurrentProc);
1739 #else
1740   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1741    * from its creation
1742    */
1743 #endif
1744
1745 #if defined(GRAN) 
1746   if (RtsFlags.GranFlags.GranSimStats.Full) 
1747     DumpGranEvent(GR_START,tso);
1748 #elif defined(PAR)
1749   if (RtsFlags.ParFlags.ParStats.Full) 
1750     DumpGranEvent(GR_STARTQ,tso);
1751   /* HACk to avoid SCHEDULE 
1752      LastTSO = tso; */
1753 #endif
1754
1755   /* Link the new thread on the global thread list.
1756    */
1757   tso->global_link = all_threads;
1758   all_threads = tso;
1759
1760 #if defined(DIST)
1761   tso->dist.priority = MandatoryPriority; //by default that is...
1762 #endif
1763
1764 #if defined(GRAN)
1765   tso->gran.pri = pri;
1766 # if defined(DEBUG)
1767   tso->gran.magic = TSO_MAGIC; // debugging only
1768 # endif
1769   tso->gran.sparkname   = 0;
1770   tso->gran.startedat   = CURRENT_TIME; 
1771   tso->gran.exported    = 0;
1772   tso->gran.basicblocks = 0;
1773   tso->gran.allocs      = 0;
1774   tso->gran.exectime    = 0;
1775   tso->gran.fetchtime   = 0;
1776   tso->gran.fetchcount  = 0;
1777   tso->gran.blocktime   = 0;
1778   tso->gran.blockcount  = 0;
1779   tso->gran.blockedat   = 0;
1780   tso->gran.globalsparks = 0;
1781   tso->gran.localsparks  = 0;
1782   if (RtsFlags.GranFlags.Light)
1783     tso->gran.clock  = Now; /* local clock */
1784   else
1785     tso->gran.clock  = 0;
1786
1787   IF_DEBUG(gran,printTSO(tso));
1788 #elif defined(PAR)
1789 # if defined(DEBUG)
1790   tso->par.magic = TSO_MAGIC; // debugging only
1791 # endif
1792   tso->par.sparkname   = 0;
1793   tso->par.startedat   = CURRENT_TIME; 
1794   tso->par.exported    = 0;
1795   tso->par.basicblocks = 0;
1796   tso->par.allocs      = 0;
1797   tso->par.exectime    = 0;
1798   tso->par.fetchtime   = 0;
1799   tso->par.fetchcount  = 0;
1800   tso->par.blocktime   = 0;
1801   tso->par.blockcount  = 0;
1802   tso->par.blockedat   = 0;
1803   tso->par.globalsparks = 0;
1804   tso->par.localsparks  = 0;
1805 #endif
1806
1807 #if defined(GRAN)
1808   globalGranStats.tot_threads_created++;
1809   globalGranStats.threads_created_on_PE[CurrentProc]++;
1810   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1811   globalGranStats.tot_sq_probes++;
1812 #elif defined(PAR)
1813   // collect parallel global statistics (currently done together with GC stats)
1814   if (RtsFlags.ParFlags.ParStats.Global &&
1815       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1816     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1817     globalParStats.tot_threads_created++;
1818   }
1819 #endif 
1820
1821 #if defined(GRAN)
1822   IF_GRAN_DEBUG(pri,
1823                 belch("==__ schedule: Created TSO %d (%p);",
1824                       CurrentProc, tso, tso->id));
1825 #elif defined(PAR)
1826     IF_PAR_DEBUG(verbose,
1827                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1828                        tso->id, tso, advisory_thread_count));
1829 #else
1830   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1831                                  tso->id, tso->stack_size));
1832 #endif    
1833   return tso;
1834 }
1835
1836 #if defined(PAR)
1837 /* RFP:
1838    all parallel thread creation calls should fall through the following routine.
1839 */
1840 StgTSO *
1841 createSparkThread(rtsSpark spark) 
1842 { StgTSO *tso;
1843   ASSERT(spark != (rtsSpark)NULL);
1844   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1845   { threadsIgnored++;
1846     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1847           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1848     return END_TSO_QUEUE;
1849   }
1850   else
1851   { threadsCreated++;
1852     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1853     if (tso==END_TSO_QUEUE)     
1854       barf("createSparkThread: Cannot create TSO");
1855 #if defined(DIST)
1856     tso->priority = AdvisoryPriority;
1857 #endif
1858     pushClosure(tso,spark);
1859     PUSH_ON_RUN_QUEUE(tso);
1860     advisory_thread_count++;    
1861   }
1862   return tso;
1863 }
1864 #endif
1865
1866 /*
1867   Turn a spark into a thread.
1868   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1869 */
1870 #if defined(PAR)
1871 //@cindex activateSpark
1872 StgTSO *
1873 activateSpark (rtsSpark spark) 
1874 {
1875   StgTSO *tso;
1876
1877   tso = createSparkThread(spark);
1878   if (RtsFlags.ParFlags.ParStats.Full) {   
1879     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1880     IF_PAR_DEBUG(verbose,
1881                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1882                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1883   }
1884   // ToDo: fwd info on local/global spark to thread -- HWL
1885   // tso->gran.exported =  spark->exported;
1886   // tso->gran.locked =   !spark->global;
1887   // tso->gran.sparkname = spark->name;
1888
1889   return tso;
1890 }
1891 #endif
1892
1893 /* ---------------------------------------------------------------------------
1894  * scheduleThread()
1895  *
1896  * scheduleThread puts a thread on the head of the runnable queue.
1897  * This will usually be done immediately after a thread is created.
1898  * The caller of scheduleThread must create the thread using e.g.
1899  * createThread and push an appropriate closure
1900  * on this thread's stack before the scheduler is invoked.
1901  * ------------------------------------------------------------------------ */
1902
1903 void
1904 scheduleThread(StgTSO *tso)
1905 {
1906   ACQUIRE_LOCK(&sched_mutex);
1907
1908   /* Put the new thread on the head of the runnable queue.  The caller
1909    * better push an appropriate closure on this thread's stack
1910    * beforehand.  In the SMP case, the thread may start running as
1911    * soon as we release the scheduler lock below.
1912    */
1913   PUSH_ON_RUN_QUEUE(tso);
1914   THREAD_RUNNABLE();
1915
1916 #if 0
1917   IF_DEBUG(scheduler,printTSO(tso));
1918 #endif
1919   RELEASE_LOCK(&sched_mutex);
1920 }
1921
1922 /* ---------------------------------------------------------------------------
1923  * initScheduler()
1924  *
1925  * Initialise the scheduler.  This resets all the queues - if the
1926  * queues contained any threads, they'll be garbage collected at the
1927  * next pass.
1928  *
1929  * ------------------------------------------------------------------------ */
1930
1931 #ifdef SMP
1932 static void
1933 term_handler(int sig STG_UNUSED)
1934 {
1935   stat_workerStop();
1936   ACQUIRE_LOCK(&term_mutex);
1937   await_death--;
1938   RELEASE_LOCK(&term_mutex);
1939   shutdownThread();
1940 }
1941 #endif
1942
1943 void 
1944 initScheduler(void)
1945 {
1946 #if defined(GRAN)
1947   nat i;
1948
1949   for (i=0; i<=MAX_PROC; i++) {
1950     run_queue_hds[i]      = END_TSO_QUEUE;
1951     run_queue_tls[i]      = END_TSO_QUEUE;
1952     blocked_queue_hds[i]  = END_TSO_QUEUE;
1953     blocked_queue_tls[i]  = END_TSO_QUEUE;
1954     ccalling_threadss[i]  = END_TSO_QUEUE;
1955     sleeping_queue        = END_TSO_QUEUE;
1956   }
1957 #else
1958   run_queue_hd      = END_TSO_QUEUE;
1959   run_queue_tl      = END_TSO_QUEUE;
1960   blocked_queue_hd  = END_TSO_QUEUE;
1961   blocked_queue_tl  = END_TSO_QUEUE;
1962   sleeping_queue    = END_TSO_QUEUE;
1963 #endif 
1964
1965   suspended_ccalling_threads  = END_TSO_QUEUE;
1966
1967   main_threads = NULL;
1968   all_threads  = END_TSO_QUEUE;
1969
1970   context_switch = 0;
1971   interrupted    = 0;
1972
1973   RtsFlags.ConcFlags.ctxtSwitchTicks =
1974       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1975       
1976 #if defined(RTS_SUPPORTS_THREADS)
1977   /* Initialise the mutex and condition variables used by
1978    * the scheduler. */
1979   initMutex(&rts_mutex);
1980   initMutex(&sched_mutex);
1981   initMutex(&term_mutex);
1982 #if defined(THREADED_RTS)
1983   initMutex(&thread_ready_aux_mutex);
1984 #endif
1985   
1986   initCondition(&thread_ready_cond);
1987   initCondition(&gc_pending_cond);
1988 #endif
1989
1990 #if defined(THREADED_RTS)
1991   /* Grab big lock */
1992   ACQUIRE_LOCK(&rts_mutex);
1993   IF_DEBUG(scheduler, 
1994            sched_belch("worker thread (%d): acquired RTS lock\n", osThreadId()));
1995 #endif
1996
1997   /* Install the SIGHUP handler */
1998 #ifdef SMP
1999   {
2000     struct sigaction action,oact;
2001
2002     action.sa_handler = term_handler;
2003     sigemptyset(&action.sa_mask);
2004     action.sa_flags = 0;
2005     if (sigaction(SIGTERM, &action, &oact) != 0) {
2006       barf("can't install TERM handler");
2007     }
2008   }
2009 #endif
2010
2011   /* A capability holds the state a native thread needs in
2012    * order to execute STG code. At least one capability is
2013    * floating around (only SMP builds have more than one).
2014    */
2015   initCapabilities();
2016   
2017 #if defined(RTS_SUPPORTS_THREADS)
2018     /* start our haskell execution tasks */
2019 # if defined(SMP)
2020     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2021 # else
2022     startTaskManager(0,taskStart);
2023 # endif
2024 #endif
2025
2026 #if /* defined(SMP) ||*/ defined(PAR)
2027   initSparkPools();
2028 #endif
2029 }
2030
2031 void
2032 exitScheduler( void )
2033 {
2034 #if defined(RTS_SUPPORTS_THREADS)
2035   stopTaskManager();
2036 #endif
2037 }
2038
2039 /* -----------------------------------------------------------------------------
2040    Managing the per-task allocation areas.
2041    
2042    Each capability comes with an allocation area.  These are
2043    fixed-length block lists into which allocation can be done.
2044
2045    ToDo: no support for two-space collection at the moment???
2046    -------------------------------------------------------------------------- */
2047
2048 /* -----------------------------------------------------------------------------
2049  * waitThread is the external interface for running a new computation
2050  * and waiting for the result.
2051  *
2052  * In the non-SMP case, we create a new main thread, push it on the 
2053  * main-thread stack, and invoke the scheduler to run it.  The
2054  * scheduler will return when the top main thread on the stack has
2055  * completed or died, and fill in the necessary fields of the
2056  * main_thread structure.
2057  *
2058  * In the SMP case, we create a main thread as before, but we then
2059  * create a new condition variable and sleep on it.  When our new
2060  * main thread has completed, we'll be woken up and the status/result
2061  * will be in the main_thread struct.
2062  * -------------------------------------------------------------------------- */
2063
2064 int 
2065 howManyThreadsAvail ( void )
2066 {
2067    int i = 0;
2068    StgTSO* q;
2069    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2070       i++;
2071    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2072       i++;
2073    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2074       i++;
2075    return i;
2076 }
2077
2078 void
2079 finishAllThreads ( void )
2080 {
2081    do {
2082       while (run_queue_hd != END_TSO_QUEUE) {
2083          waitThread ( run_queue_hd, NULL );
2084       }
2085       while (blocked_queue_hd != END_TSO_QUEUE) {
2086          waitThread ( blocked_queue_hd, NULL );
2087       }
2088       while (sleeping_queue != END_TSO_QUEUE) {
2089          waitThread ( blocked_queue_hd, NULL );
2090       }
2091    } while 
2092       (blocked_queue_hd != END_TSO_QUEUE || 
2093        run_queue_hd     != END_TSO_QUEUE ||
2094        sleeping_queue   != END_TSO_QUEUE);
2095 }
2096
2097 SchedulerStatus
2098 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2099 {
2100   StgMainThread *m;
2101   SchedulerStatus stat;
2102
2103   ACQUIRE_LOCK(&sched_mutex);
2104   
2105   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2106
2107   m->tso = tso;
2108   m->ret = ret;
2109   m->stat = NoStatus;
2110 #if defined(RTS_SUPPORTS_THREADS)
2111   initCondition(&m->wakeup);
2112 #endif
2113
2114   m->link = main_threads;
2115   main_threads = m;
2116
2117   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
2118                               m->tso->id));
2119
2120 #ifdef SMP
2121   do {
2122     waitCondition(&m->wakeup, &sched_mutex);
2123   } while (m->stat == NoStatus);
2124 #elif defined(GRAN)
2125   /* GranSim specific init */
2126   CurrentTSO = m->tso;                // the TSO to run
2127   procStatus[MainProc] = Busy;        // status of main PE
2128   CurrentProc = MainProc;             // PE to run it on
2129
2130   schedule();
2131 #else
2132   RELEASE_LOCK(&sched_mutex);
2133   schedule();
2134   ASSERT(m->stat != NoStatus);
2135 #endif
2136
2137   stat = m->stat;
2138
2139 #if defined(RTS_SUPPORTS_THREADS)
2140   closeCondition(&m->wakeup);
2141 #endif
2142
2143   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2144                               m->tso->id));
2145   free(m);
2146
2147   RELEASE_LOCK(&sched_mutex);
2148
2149   return stat;
2150 }
2151
2152 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2153 //@subsection Run queue code 
2154
2155 #if 0
2156 /* 
2157    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2158        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2159        implicit global variable that has to be correct when calling these
2160        fcts -- HWL 
2161 */
2162
2163 /* Put the new thread on the head of the runnable queue.
2164  * The caller of createThread better push an appropriate closure
2165  * on this thread's stack before the scheduler is invoked.
2166  */
2167 static /* inline */ void
2168 add_to_run_queue(tso)
2169 StgTSO* tso; 
2170 {
2171   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2172   tso->link = run_queue_hd;
2173   run_queue_hd = tso;
2174   if (run_queue_tl == END_TSO_QUEUE) {
2175     run_queue_tl = tso;
2176   }
2177 }
2178
2179 /* Put the new thread at the end of the runnable queue. */
2180 static /* inline */ void
2181 push_on_run_queue(tso)
2182 StgTSO* tso; 
2183 {
2184   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2185   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2186   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2187   if (run_queue_hd == END_TSO_QUEUE) {
2188     run_queue_hd = tso;
2189   } else {
2190     run_queue_tl->link = tso;
2191   }
2192   run_queue_tl = tso;
2193 }
2194
2195 /* 
2196    Should be inlined because it's used very often in schedule.  The tso
2197    argument is actually only needed in GranSim, where we want to have the
2198    possibility to schedule *any* TSO on the run queue, irrespective of the
2199    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2200    the run queue and dequeue the tso, adjusting the links in the queue. 
2201 */
2202 //@cindex take_off_run_queue
2203 static /* inline */ StgTSO*
2204 take_off_run_queue(StgTSO *tso) {
2205   StgTSO *t, *prev;
2206
2207   /* 
2208      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2209
2210      if tso is specified, unlink that tso from the run_queue (doesn't have
2211      to be at the beginning of the queue); GranSim only 
2212   */
2213   if (tso!=END_TSO_QUEUE) {
2214     /* find tso in queue */
2215     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2216          t!=END_TSO_QUEUE && t!=tso;
2217          prev=t, t=t->link) 
2218       /* nothing */ ;
2219     ASSERT(t==tso);
2220     /* now actually dequeue the tso */
2221     if (prev!=END_TSO_QUEUE) {
2222       ASSERT(run_queue_hd!=t);
2223       prev->link = t->link;
2224     } else {
2225       /* t is at beginning of thread queue */
2226       ASSERT(run_queue_hd==t);
2227       run_queue_hd = t->link;
2228     }
2229     /* t is at end of thread queue */
2230     if (t->link==END_TSO_QUEUE) {
2231       ASSERT(t==run_queue_tl);
2232       run_queue_tl = prev;
2233     } else {
2234       ASSERT(run_queue_tl!=t);
2235     }
2236     t->link = END_TSO_QUEUE;
2237   } else {
2238     /* take tso from the beginning of the queue; std concurrent code */
2239     t = run_queue_hd;
2240     if (t != END_TSO_QUEUE) {
2241       run_queue_hd = t->link;
2242       t->link = END_TSO_QUEUE;
2243       if (run_queue_hd == END_TSO_QUEUE) {
2244         run_queue_tl = END_TSO_QUEUE;
2245       }
2246     }
2247   }
2248   return t;
2249 }
2250
2251 #endif /* 0 */
2252
2253 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2254 //@subsection Garbage Collextion Routines
2255
2256 /* ---------------------------------------------------------------------------
2257    Where are the roots that we know about?
2258
2259         - all the threads on the runnable queue
2260         - all the threads on the blocked queue
2261         - all the threads on the sleeping queue
2262         - all the thread currently executing a _ccall_GC
2263         - all the "main threads"
2264      
2265    ------------------------------------------------------------------------ */
2266
2267 /* This has to be protected either by the scheduler monitor, or by the
2268         garbage collection monitor (probably the latter).
2269         KH @ 25/10/99
2270 */
2271
2272 void
2273 GetRoots(evac_fn evac)
2274 {
2275   StgMainThread *m;
2276
2277 #if defined(GRAN)
2278   {
2279     nat i;
2280     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2281       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2282           evac((StgClosure **)&run_queue_hds[i]);
2283       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2284           evac((StgClosure **)&run_queue_tls[i]);
2285       
2286       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2287           evac((StgClosure **)&blocked_queue_hds[i]);
2288       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2289           evac((StgClosure **)&blocked_queue_tls[i]);
2290       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2291           evac((StgClosure **)&ccalling_threads[i]);
2292     }
2293   }
2294
2295   markEventQueue();
2296
2297 #else /* !GRAN */
2298   if (run_queue_hd != END_TSO_QUEUE) {
2299       ASSERT(run_queue_tl != END_TSO_QUEUE);
2300       evac((StgClosure **)&run_queue_hd);
2301       evac((StgClosure **)&run_queue_tl);
2302   }
2303   
2304   if (blocked_queue_hd != END_TSO_QUEUE) {
2305       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2306       evac((StgClosure **)&blocked_queue_hd);
2307       evac((StgClosure **)&blocked_queue_tl);
2308   }
2309   
2310   if (sleeping_queue != END_TSO_QUEUE) {
2311       evac((StgClosure **)&sleeping_queue);
2312   }
2313 #endif 
2314
2315   for (m = main_threads; m != NULL; m = m->link) {
2316       evac((StgClosure **)&m->tso);
2317   }
2318   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2319       evac((StgClosure **)&suspended_ccalling_threads);
2320   }
2321
2322 #if defined(PAR) || defined(GRAN)
2323   markSparkQueue(evac);
2324 #endif
2325 }
2326
2327 /* -----------------------------------------------------------------------------
2328    performGC
2329
2330    This is the interface to the garbage collector from Haskell land.
2331    We provide this so that external C code can allocate and garbage
2332    collect when called from Haskell via _ccall_GC.
2333
2334    It might be useful to provide an interface whereby the programmer
2335    can specify more roots (ToDo).
2336    
2337    This needs to be protected by the GC condition variable above.  KH.
2338    -------------------------------------------------------------------------- */
2339
2340 void (*extra_roots)(evac_fn);
2341
2342 void
2343 performGC(void)
2344 {
2345   GarbageCollect(GetRoots,rtsFalse);
2346 }
2347
2348 void
2349 performMajorGC(void)
2350 {
2351   GarbageCollect(GetRoots,rtsTrue);
2352 }
2353
2354 static void
2355 AllRoots(evac_fn evac)
2356 {
2357     GetRoots(evac);             // the scheduler's roots
2358     extra_roots(evac);          // the user's roots
2359 }
2360
2361 void
2362 performGCWithRoots(void (*get_roots)(evac_fn))
2363 {
2364   extra_roots = get_roots;
2365   GarbageCollect(AllRoots,rtsFalse);
2366 }
2367
2368 /* -----------------------------------------------------------------------------
2369    Stack overflow
2370
2371    If the thread has reached its maximum stack size, then raise the
2372    StackOverflow exception in the offending thread.  Otherwise
2373    relocate the TSO into a larger chunk of memory and adjust its stack
2374    size appropriately.
2375    -------------------------------------------------------------------------- */
2376
2377 static StgTSO *
2378 threadStackOverflow(StgTSO *tso)
2379 {
2380   nat new_stack_size, new_tso_size, diff, stack_words;
2381   StgPtr new_sp;
2382   StgTSO *dest;
2383
2384   IF_DEBUG(sanity,checkTSO(tso));
2385   if (tso->stack_size >= tso->max_stack_size) {
2386
2387     IF_DEBUG(gc,
2388              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2389                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2390              /* If we're debugging, just print out the top of the stack */
2391              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2392                                               tso->sp+64)));
2393
2394     /* Send this thread the StackOverflow exception */
2395     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2396     return tso;
2397   }
2398
2399   /* Try to double the current stack size.  If that takes us over the
2400    * maximum stack size for this thread, then use the maximum instead.
2401    * Finally round up so the TSO ends up as a whole number of blocks.
2402    */
2403   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2404   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2405                                        TSO_STRUCT_SIZE)/sizeof(W_);
2406   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2407   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2408
2409   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2410
2411   dest = (StgTSO *)allocate(new_tso_size);
2412   TICK_ALLOC_TSO(new_stack_size,0);
2413
2414   /* copy the TSO block and the old stack into the new area */
2415   memcpy(dest,tso,TSO_STRUCT_SIZE);
2416   stack_words = tso->stack + tso->stack_size - tso->sp;
2417   new_sp = (P_)dest + new_tso_size - stack_words;
2418   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2419
2420   /* relocate the stack pointers... */
2421   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2422   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2423   dest->sp    = new_sp;
2424   dest->stack_size = new_stack_size;
2425         
2426   /* and relocate the update frame list */
2427   relocate_stack(dest, diff);
2428
2429   /* Mark the old TSO as relocated.  We have to check for relocated
2430    * TSOs in the garbage collector and any primops that deal with TSOs.
2431    *
2432    * It's important to set the sp and su values to just beyond the end
2433    * of the stack, so we don't attempt to scavenge any part of the
2434    * dead TSO's stack.
2435    */
2436   tso->what_next = ThreadRelocated;
2437   tso->link = dest;
2438   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2439   tso->su = (StgUpdateFrame *)tso->sp;
2440   tso->why_blocked = NotBlocked;
2441   dest->mut_link = NULL;
2442
2443   IF_PAR_DEBUG(verbose,
2444                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2445                      tso->id, tso, tso->stack_size);
2446                /* If we're debugging, just print out the top of the stack */
2447                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2448                                                 tso->sp+64)));
2449   
2450   IF_DEBUG(sanity,checkTSO(tso));
2451 #if 0
2452   IF_DEBUG(scheduler,printTSO(dest));
2453 #endif
2454
2455   return dest;
2456 }
2457
2458 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2459 //@subsection Blocking Queue Routines
2460
2461 /* ---------------------------------------------------------------------------
2462    Wake up a queue that was blocked on some resource.
2463    ------------------------------------------------------------------------ */
2464
2465 #if defined(GRAN)
2466 static inline void
2467 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2468 {
2469 }
2470 #elif defined(PAR)
2471 static inline void
2472 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2473 {
2474   /* write RESUME events to log file and
2475      update blocked and fetch time (depending on type of the orig closure) */
2476   if (RtsFlags.ParFlags.ParStats.Full) {
2477     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2478                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2479                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2480     if (EMPTY_RUN_QUEUE())
2481       emitSchedule = rtsTrue;
2482
2483     switch (get_itbl(node)->type) {
2484         case FETCH_ME_BQ:
2485           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2486           break;
2487         case RBH:
2488         case FETCH_ME:
2489         case BLACKHOLE_BQ:
2490           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2491           break;
2492 #ifdef DIST
2493         case MVAR:
2494           break;
2495 #endif    
2496         default:
2497           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2498         }
2499       }
2500 }
2501 #endif
2502
2503 #if defined(GRAN)
2504 static StgBlockingQueueElement *
2505 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2506 {
2507     StgTSO *tso;
2508     PEs node_loc, tso_loc;
2509
2510     node_loc = where_is(node); // should be lifted out of loop
2511     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2512     tso_loc = where_is((StgClosure *)tso);
2513     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2514       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2515       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2516       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2517       // insertThread(tso, node_loc);
2518       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2519                 ResumeThread,
2520                 tso, node, (rtsSpark*)NULL);
2521       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2522       // len_local++;
2523       // len++;
2524     } else { // TSO is remote (actually should be FMBQ)
2525       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2526                                   RtsFlags.GranFlags.Costs.gunblocktime +
2527                                   RtsFlags.GranFlags.Costs.latency;
2528       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2529                 UnblockThread,
2530                 tso, node, (rtsSpark*)NULL);
2531       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2532       // len++;
2533     }
2534     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2535     IF_GRAN_DEBUG(bq,
2536                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2537                           (node_loc==tso_loc ? "Local" : "Global"), 
2538                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2539     tso->block_info.closure = NULL;
2540     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2541                              tso->id, tso));
2542 }
2543 #elif defined(PAR)
2544 static StgBlockingQueueElement *
2545 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2546 {
2547     StgBlockingQueueElement *next;
2548
2549     switch (get_itbl(bqe)->type) {
2550     case TSO:
2551       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2552       /* if it's a TSO just push it onto the run_queue */
2553       next = bqe->link;
2554       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2555       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2556       THREAD_RUNNABLE();
2557       unblockCount(bqe, node);
2558       /* reset blocking status after dumping event */
2559       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2560       break;
2561
2562     case BLOCKED_FETCH:
2563       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2564       next = bqe->link;
2565       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2566       PendingFetches = (StgBlockedFetch *)bqe;
2567       break;
2568
2569 # if defined(DEBUG)
2570       /* can ignore this case in a non-debugging setup; 
2571          see comments on RBHSave closures above */
2572     case CONSTR:
2573       /* check that the closure is an RBHSave closure */
2574       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2575              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2576              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2577       break;
2578
2579     default:
2580       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2581            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2582            (StgClosure *)bqe);
2583 # endif
2584     }
2585   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2586   return next;
2587 }
2588
2589 #else /* !GRAN && !PAR */
2590 static StgTSO *
2591 unblockOneLocked(StgTSO *tso)
2592 {
2593   StgTSO *next;
2594
2595   ASSERT(get_itbl(tso)->type == TSO);
2596   ASSERT(tso->why_blocked != NotBlocked);
2597   tso->why_blocked = NotBlocked;
2598   next = tso->link;
2599   PUSH_ON_RUN_QUEUE(tso);
2600   THREAD_RUNNABLE();
2601   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2602   return next;
2603 }
2604 #endif
2605
2606 #if defined(GRAN) || defined(PAR)
2607 inline StgBlockingQueueElement *
2608 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2609 {
2610   ACQUIRE_LOCK(&sched_mutex);
2611   bqe = unblockOneLocked(bqe, node);
2612   RELEASE_LOCK(&sched_mutex);
2613   return bqe;
2614 }
2615 #else
2616 inline StgTSO *
2617 unblockOne(StgTSO *tso)
2618 {
2619   ACQUIRE_LOCK(&sched_mutex);
2620   tso = unblockOneLocked(tso);
2621   RELEASE_LOCK(&sched_mutex);
2622   return tso;
2623 }
2624 #endif
2625
2626 #if defined(GRAN)
2627 void 
2628 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2629 {
2630   StgBlockingQueueElement *bqe;
2631   PEs node_loc;
2632   nat len = 0; 
2633
2634   IF_GRAN_DEBUG(bq, 
2635                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2636                       node, CurrentProc, CurrentTime[CurrentProc], 
2637                       CurrentTSO->id, CurrentTSO));
2638
2639   node_loc = where_is(node);
2640
2641   ASSERT(q == END_BQ_QUEUE ||
2642          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2643          get_itbl(q)->type == CONSTR); // closure (type constructor)
2644   ASSERT(is_unique(node));
2645
2646   /* FAKE FETCH: magically copy the node to the tso's proc;
2647      no Fetch necessary because in reality the node should not have been 
2648      moved to the other PE in the first place
2649   */
2650   if (CurrentProc!=node_loc) {
2651     IF_GRAN_DEBUG(bq, 
2652                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2653                         node, node_loc, CurrentProc, CurrentTSO->id, 
2654                         // CurrentTSO, where_is(CurrentTSO),
2655                         node->header.gran.procs));
2656     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2657     IF_GRAN_DEBUG(bq, 
2658                   belch("## new bitmask of node %p is %#x",
2659                         node, node->header.gran.procs));
2660     if (RtsFlags.GranFlags.GranSimStats.Global) {
2661       globalGranStats.tot_fake_fetches++;
2662     }
2663   }
2664
2665   bqe = q;
2666   // ToDo: check: ASSERT(CurrentProc==node_loc);
2667   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2668     //next = bqe->link;
2669     /* 
2670        bqe points to the current element in the queue
2671        next points to the next element in the queue
2672     */
2673     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2674     //tso_loc = where_is(tso);
2675     len++;
2676     bqe = unblockOneLocked(bqe, node);
2677   }
2678
2679   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2680      the closure to make room for the anchor of the BQ */
2681   if (bqe!=END_BQ_QUEUE) {
2682     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2683     /*
2684     ASSERT((info_ptr==&RBH_Save_0_info) ||
2685            (info_ptr==&RBH_Save_1_info) ||
2686            (info_ptr==&RBH_Save_2_info));
2687     */
2688     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2689     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2690     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2691
2692     IF_GRAN_DEBUG(bq,
2693                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2694                         node, info_type(node)));
2695   }
2696
2697   /* statistics gathering */
2698   if (RtsFlags.GranFlags.GranSimStats.Global) {
2699     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2700     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2701     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2702     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2703   }
2704   IF_GRAN_DEBUG(bq,
2705                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2706                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2707 }
2708 #elif defined(PAR)
2709 void 
2710 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2711 {
2712   StgBlockingQueueElement *bqe;
2713
2714   ACQUIRE_LOCK(&sched_mutex);
2715
2716   IF_PAR_DEBUG(verbose, 
2717                belch("##-_ AwBQ for node %p on [%x]: ",
2718                      node, mytid));
2719 #ifdef DIST  
2720   //RFP
2721   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2722     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2723     return;
2724   }
2725 #endif
2726   
2727   ASSERT(q == END_BQ_QUEUE ||
2728          get_itbl(q)->type == TSO ||           
2729          get_itbl(q)->type == BLOCKED_FETCH || 
2730          get_itbl(q)->type == CONSTR); 
2731
2732   bqe = q;
2733   while (get_itbl(bqe)->type==TSO || 
2734          get_itbl(bqe)->type==BLOCKED_FETCH) {
2735     bqe = unblockOneLocked(bqe, node);
2736   }
2737   RELEASE_LOCK(&sched_mutex);
2738 }
2739
2740 #else   /* !GRAN && !PAR */
2741 void
2742 awakenBlockedQueue(StgTSO *tso)
2743 {
2744   ACQUIRE_LOCK(&sched_mutex);
2745   while (tso != END_TSO_QUEUE) {
2746     tso = unblockOneLocked(tso);
2747   }
2748   RELEASE_LOCK(&sched_mutex);
2749 }
2750 #endif
2751
2752 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2753 //@subsection Exception Handling Routines
2754
2755 /* ---------------------------------------------------------------------------
2756    Interrupt execution
2757    - usually called inside a signal handler so it mustn't do anything fancy.   
2758    ------------------------------------------------------------------------ */
2759
2760 void
2761 interruptStgRts(void)
2762 {
2763     interrupted    = 1;
2764     context_switch = 1;
2765 }
2766
2767 /* -----------------------------------------------------------------------------
2768    Unblock a thread
2769
2770    This is for use when we raise an exception in another thread, which
2771    may be blocked.
2772    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2773    -------------------------------------------------------------------------- */
2774
2775 #if defined(GRAN) || defined(PAR)
2776 /*
2777   NB: only the type of the blocking queue is different in GranSim and GUM
2778       the operations on the queue-elements are the same
2779       long live polymorphism!
2780 */
2781 static void
2782 unblockThread(StgTSO *tso)
2783 {
2784   StgBlockingQueueElement *t, **last;
2785
2786   ACQUIRE_LOCK(&sched_mutex);
2787   switch (tso->why_blocked) {
2788
2789   case NotBlocked:
2790     return;  /* not blocked */
2791
2792   case BlockedOnMVar:
2793     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2794     {
2795       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2796       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2797
2798       last = (StgBlockingQueueElement **)&mvar->head;
2799       for (t = (StgBlockingQueueElement *)mvar->head; 
2800            t != END_BQ_QUEUE; 
2801            last = &t->link, last_tso = t, t = t->link) {
2802         if (t == (StgBlockingQueueElement *)tso) {
2803           *last = (StgBlockingQueueElement *)tso->link;
2804           if (mvar->tail == tso) {
2805             mvar->tail = (StgTSO *)last_tso;
2806           }
2807           goto done;
2808         }
2809       }
2810       barf("unblockThread (MVAR): TSO not found");
2811     }
2812
2813   case BlockedOnBlackHole:
2814     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2815     {
2816       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2817
2818       last = &bq->blocking_queue;
2819       for (t = bq->blocking_queue; 
2820            t != END_BQ_QUEUE; 
2821            last = &t->link, t = t->link) {
2822         if (t == (StgBlockingQueueElement *)tso) {
2823           *last = (StgBlockingQueueElement *)tso->link;
2824           goto done;
2825         }
2826       }
2827       barf("unblockThread (BLACKHOLE): TSO not found");
2828     }
2829
2830   case BlockedOnException:
2831     {
2832       StgTSO *target  = tso->block_info.tso;
2833
2834       ASSERT(get_itbl(target)->type == TSO);
2835
2836       if (target->what_next == ThreadRelocated) {
2837           target = target->link;
2838           ASSERT(get_itbl(target)->type == TSO);
2839       }
2840
2841       ASSERT(target->blocked_exceptions != NULL);
2842
2843       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2844       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2845            t != END_BQ_QUEUE; 
2846            last = &t->link, t = t->link) {
2847         ASSERT(get_itbl(t)->type == TSO);
2848         if (t == (StgBlockingQueueElement *)tso) {
2849           *last = (StgBlockingQueueElement *)tso->link;
2850           goto done;
2851         }
2852       }
2853       barf("unblockThread (Exception): TSO not found");
2854     }
2855
2856   case BlockedOnRead:
2857   case BlockedOnWrite:
2858     {
2859       /* take TSO off blocked_queue */
2860       StgBlockingQueueElement *prev = NULL;
2861       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2862            prev = t, t = t->link) {
2863         if (t == (StgBlockingQueueElement *)tso) {
2864           if (prev == NULL) {
2865             blocked_queue_hd = (StgTSO *)t->link;
2866             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2867               blocked_queue_tl = END_TSO_QUEUE;
2868             }
2869           } else {
2870             prev->link = t->link;
2871             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2872               blocked_queue_tl = (StgTSO *)prev;
2873             }
2874           }
2875           goto done;
2876         }
2877       }
2878       barf("unblockThread (I/O): TSO not found");
2879     }
2880
2881   case BlockedOnDelay:
2882     {
2883       /* take TSO off sleeping_queue */
2884       StgBlockingQueueElement *prev = NULL;
2885       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2886            prev = t, t = t->link) {
2887         if (t == (StgBlockingQueueElement *)tso) {
2888           if (prev == NULL) {
2889             sleeping_queue = (StgTSO *)t->link;
2890           } else {
2891             prev->link = t->link;
2892           }
2893           goto done;
2894         }
2895       }
2896       barf("unblockThread (I/O): TSO not found");
2897     }
2898
2899   default:
2900     barf("unblockThread");
2901   }
2902
2903  done:
2904   tso->link = END_TSO_QUEUE;
2905   tso->why_blocked = NotBlocked;
2906   tso->block_info.closure = NULL;
2907   PUSH_ON_RUN_QUEUE(tso);
2908   RELEASE_LOCK(&sched_mutex);
2909 }
2910 #else
2911 static void
2912 unblockThread(StgTSO *tso)
2913 {
2914   StgTSO *t, **last;
2915
2916   ACQUIRE_LOCK(&sched_mutex);
2917   switch (tso->why_blocked) {
2918
2919   case NotBlocked:
2920     return;  /* not blocked */
2921
2922   case BlockedOnMVar:
2923     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2924     {
2925       StgTSO *last_tso = END_TSO_QUEUE;
2926       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2927
2928       last = &mvar->head;
2929       for (t = mvar->head; t != END_TSO_QUEUE; 
2930            last = &t->link, last_tso = t, t = t->link) {
2931         if (t == tso) {
2932           *last = tso->link;
2933           if (mvar->tail == tso) {
2934             mvar->tail = last_tso;
2935           }
2936           goto done;
2937         }
2938       }
2939       barf("unblockThread (MVAR): TSO not found");
2940     }
2941
2942   case BlockedOnBlackHole:
2943     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2944     {
2945       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2946
2947       last = &bq->blocking_queue;
2948       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2949            last = &t->link, t = t->link) {
2950         if (t == tso) {
2951           *last = tso->link;
2952           goto done;
2953         }
2954       }
2955       barf("unblockThread (BLACKHOLE): TSO not found");
2956     }
2957
2958   case BlockedOnException:
2959     {
2960       StgTSO *target  = tso->block_info.tso;
2961
2962       ASSERT(get_itbl(target)->type == TSO);
2963
2964       while (target->what_next == ThreadRelocated) {
2965           target = target->link;
2966           ASSERT(get_itbl(target)->type == TSO);
2967       }
2968       
2969       ASSERT(target->blocked_exceptions != NULL);
2970
2971       last = &target->blocked_exceptions;
2972       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2973            last = &t->link, t = t->link) {
2974         ASSERT(get_itbl(t)->type == TSO);
2975         if (t == tso) {
2976           *last = tso->link;
2977           goto done;
2978         }
2979       }
2980       barf("unblockThread (Exception): TSO not found");
2981     }
2982
2983   case BlockedOnRead:
2984   case BlockedOnWrite:
2985     {
2986       StgTSO *prev = NULL;
2987       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2988            prev = t, t = t->link) {
2989         if (t == tso) {
2990           if (prev == NULL) {
2991             blocked_queue_hd = t->link;
2992             if (blocked_queue_tl == t) {
2993               blocked_queue_tl = END_TSO_QUEUE;
2994             }
2995           } else {
2996             prev->link = t->link;
2997             if (blocked_queue_tl == t) {
2998               blocked_queue_tl = prev;
2999             }
3000           }
3001           goto done;
3002         }
3003       }
3004       barf("unblockThread (I/O): TSO not found");
3005     }
3006
3007   case BlockedOnDelay:
3008     {
3009       StgTSO *prev = NULL;
3010       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3011            prev = t, t = t->link) {
3012         if (t == tso) {
3013           if (prev == NULL) {
3014             sleeping_queue = t->link;
3015           } else {
3016             prev->link = t->link;
3017           }
3018           goto done;
3019         }
3020       }
3021       barf("unblockThread (I/O): TSO not found");
3022     }
3023
3024   default:
3025     barf("unblockThread");
3026   }
3027
3028  done:
3029   tso->link = END_TSO_QUEUE;
3030   tso->why_blocked = NotBlocked;
3031   tso->block_info.closure = NULL;
3032   PUSH_ON_RUN_QUEUE(tso);
3033   RELEASE_LOCK(&sched_mutex);
3034 }
3035 #endif
3036
3037 /* -----------------------------------------------------------------------------
3038  * raiseAsync()
3039  *
3040  * The following function implements the magic for raising an
3041  * asynchronous exception in an existing thread.
3042  *
3043  * We first remove the thread from any queue on which it might be
3044  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3045  *
3046  * We strip the stack down to the innermost CATCH_FRAME, building
3047  * thunks in the heap for all the active computations, so they can 
3048  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3049  * an application of the handler to the exception, and push it on
3050  * the top of the stack.
3051  * 
3052  * How exactly do we save all the active computations?  We create an
3053  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3054  * AP_UPDs pushes everything from the corresponding update frame
3055  * upwards onto the stack.  (Actually, it pushes everything up to the
3056  * next update frame plus a pointer to the next AP_UPD object.
3057  * Entering the next AP_UPD object pushes more onto the stack until we
3058  * reach the last AP_UPD object - at which point the stack should look
3059  * exactly as it did when we killed the TSO and we can continue
3060  * execution by entering the closure on top of the stack.
3061  *
3062  * We can also kill a thread entirely - this happens if either (a) the 
3063  * exception passed to raiseAsync is NULL, or (b) there's no
3064  * CATCH_FRAME on the stack.  In either case, we strip the entire
3065  * stack and replace the thread with a zombie.
3066  *
3067  * -------------------------------------------------------------------------- */
3068  
3069 void 
3070 deleteThread(StgTSO *tso)
3071 {
3072   raiseAsync(tso,NULL);
3073 }
3074
3075 void
3076 raiseAsync(StgTSO *tso, StgClosure *exception)
3077 {
3078   StgUpdateFrame* su = tso->su;
3079   StgPtr          sp = tso->sp;
3080   
3081   /* Thread already dead? */
3082   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3083     return;
3084   }
3085
3086   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3087
3088   /* Remove it from any blocking queues */
3089   unblockThread(tso);
3090
3091   /* The stack freezing code assumes there's a closure pointer on
3092    * the top of the stack.  This isn't always the case with compiled
3093    * code, so we have to push a dummy closure on the top which just
3094    * returns to the next return address on the stack.
3095    */
3096   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3097     *(--sp) = (W_)&stg_dummy_ret_closure;
3098   }
3099
3100   while (1) {
3101     nat words = ((P_)su - (P_)sp) - 1;
3102     nat i;
3103     StgAP_UPD * ap;
3104
3105     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3106      * then build PAP(handler,exception,realworld#), and leave it on
3107      * top of the stack ready to enter.
3108      */
3109     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3110       StgCatchFrame *cf = (StgCatchFrame *)su;
3111       /* we've got an exception to raise, so let's pass it to the
3112        * handler in this frame.
3113        */
3114       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3115       TICK_ALLOC_UPD_PAP(3,0);
3116       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3117               
3118       ap->n_args = 2;
3119       ap->fun = cf->handler;    /* :: Exception -> IO a */
3120       ap->payload[0] = exception;
3121       ap->payload[1] = ARG_TAG(0); /* realworld token */
3122
3123       /* throw away the stack from Sp up to and including the
3124        * CATCH_FRAME.
3125        */
3126       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3127       tso->su = cf->link;
3128
3129       /* Restore the blocked/unblocked state for asynchronous exceptions
3130        * at the CATCH_FRAME.  
3131        *
3132        * If exceptions were unblocked at the catch, arrange that they
3133        * are unblocked again after executing the handler by pushing an
3134        * unblockAsyncExceptions_ret stack frame.
3135        */
3136       if (!cf->exceptions_blocked) {
3137         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3138       }
3139       
3140       /* Ensure that async exceptions are blocked when running the handler.
3141        */
3142       if (tso->blocked_exceptions == NULL) {
3143         tso->blocked_exceptions = END_TSO_QUEUE;
3144       }
3145       
3146       /* Put the newly-built PAP on top of the stack, ready to execute
3147        * when the thread restarts.
3148        */
3149       sp[0] = (W_)ap;
3150       tso->sp = sp;
3151       tso->what_next = ThreadEnterGHC;
3152       IF_DEBUG(sanity, checkTSO(tso));
3153       return;
3154     }
3155
3156     /* First build an AP_UPD consisting of the stack chunk above the
3157      * current update frame, with the top word on the stack as the
3158      * fun field.
3159      */
3160     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3161     
3162     ASSERT(words >= 0);
3163     
3164     ap->n_args = words;
3165     ap->fun    = (StgClosure *)sp[0];
3166     sp++;
3167     for(i=0; i < (nat)words; ++i) {
3168       ap->payload[i] = (StgClosure *)*sp++;
3169     }
3170     
3171     switch (get_itbl(su)->type) {
3172       
3173     case UPDATE_FRAME:
3174       {
3175         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3176         TICK_ALLOC_UP_THK(words+1,0);
3177         
3178         IF_DEBUG(scheduler,
3179                  fprintf(stderr,  "scheduler: Updating ");
3180                  printPtr((P_)su->updatee); 
3181                  fprintf(stderr,  " with ");
3182                  printObj((StgClosure *)ap);
3183                  );
3184         
3185         /* Replace the updatee with an indirection - happily
3186          * this will also wake up any threads currently
3187          * waiting on the result.
3188          *
3189          * Warning: if we're in a loop, more than one update frame on
3190          * the stack may point to the same object.  Be careful not to
3191          * overwrite an IND_OLDGEN in this case, because we'll screw
3192          * up the mutable lists.  To be on the safe side, don't
3193          * overwrite any kind of indirection at all.  See also
3194          * threadSqueezeStack in GC.c, where we have to make a similar
3195          * check.
3196          */
3197         if (!closure_IND(su->updatee)) {
3198             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3199         }
3200         su = su->link;
3201         sp += sizeofW(StgUpdateFrame) -1;
3202         sp[0] = (W_)ap; /* push onto stack */
3203         break;
3204       }
3205
3206     case CATCH_FRAME:
3207       {
3208         StgCatchFrame *cf = (StgCatchFrame *)su;
3209         StgClosure* o;
3210         
3211         /* We want a PAP, not an AP_UPD.  Fortunately, the
3212          * layout's the same.
3213          */
3214         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3215         TICK_ALLOC_UPD_PAP(words+1,0);
3216         
3217         /* now build o = FUN(catch,ap,handler) */
3218         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3219         TICK_ALLOC_FUN(2,0);
3220         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3221         o->payload[0] = (StgClosure *)ap;
3222         o->payload[1] = cf->handler;
3223         
3224         IF_DEBUG(scheduler,
3225                  fprintf(stderr,  "scheduler: Built ");
3226                  printObj((StgClosure *)o);
3227                  );
3228         
3229         /* pop the old handler and put o on the stack */
3230         su = cf->link;
3231         sp += sizeofW(StgCatchFrame) - 1;
3232         sp[0] = (W_)o;
3233         break;
3234       }
3235       
3236     case SEQ_FRAME:
3237       {
3238         StgSeqFrame *sf = (StgSeqFrame *)su;
3239         StgClosure* o;
3240         
3241         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3242         TICK_ALLOC_UPD_PAP(words+1,0);
3243         
3244         /* now build o = FUN(seq,ap) */
3245         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3246         TICK_ALLOC_SE_THK(1,0);
3247         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3248         o->payload[0] = (StgClosure *)ap;
3249         
3250         IF_DEBUG(scheduler,
3251                  fprintf(stderr,  "scheduler: Built ");
3252                  printObj((StgClosure *)o);
3253                  );
3254         
3255         /* pop the old handler and put o on the stack */
3256         su = sf->link;
3257         sp += sizeofW(StgSeqFrame) - 1;
3258         sp[0] = (W_)o;
3259         break;
3260       }
3261       
3262     case STOP_FRAME:
3263       /* We've stripped the entire stack, the thread is now dead. */
3264       sp += sizeofW(StgStopFrame) - 1;
3265       sp[0] = (W_)exception;    /* save the exception */
3266       tso->what_next = ThreadKilled;
3267       tso->su = (StgUpdateFrame *)(sp+1);
3268       tso->sp = sp;
3269       return;
3270
3271     default:
3272       barf("raiseAsync");
3273     }
3274   }
3275   barf("raiseAsync");
3276 }
3277
3278 /* -----------------------------------------------------------------------------
3279    resurrectThreads is called after garbage collection on the list of
3280    threads found to be garbage.  Each of these threads will be woken
3281    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3282    on an MVar, or NonTermination if the thread was blocked on a Black
3283    Hole.
3284    -------------------------------------------------------------------------- */
3285
3286 void
3287 resurrectThreads( StgTSO *threads )
3288 {
3289   StgTSO *tso, *next;
3290
3291   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3292     next = tso->global_link;
3293     tso->global_link = all_threads;
3294     all_threads = tso;
3295     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3296
3297     switch (tso->why_blocked) {
3298     case BlockedOnMVar:
3299     case BlockedOnException:
3300       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3301       break;
3302     case BlockedOnBlackHole:
3303       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3304       break;
3305     case NotBlocked:
3306       /* This might happen if the thread was blocked on a black hole
3307        * belonging to a thread that we've just woken up (raiseAsync
3308        * can wake up threads, remember...).
3309        */
3310       continue;
3311     default:
3312       barf("resurrectThreads: thread blocked in a strange way");
3313     }
3314   }
3315 }
3316
3317 /* -----------------------------------------------------------------------------
3318  * Blackhole detection: if we reach a deadlock, test whether any
3319  * threads are blocked on themselves.  Any threads which are found to
3320  * be self-blocked get sent a NonTermination exception.
3321  *
3322  * This is only done in a deadlock situation in order to avoid
3323  * performance overhead in the normal case.
3324  * -------------------------------------------------------------------------- */
3325
3326 static void
3327 detectBlackHoles( void )
3328 {
3329     StgTSO *t = all_threads;
3330     StgUpdateFrame *frame;
3331     StgClosure *blocked_on;
3332
3333     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3334
3335         while (t->what_next == ThreadRelocated) {
3336             t = t->link;
3337             ASSERT(get_itbl(t)->type == TSO);
3338         }
3339       
3340         if (t->why_blocked != BlockedOnBlackHole) {
3341             continue;
3342         }
3343
3344         blocked_on = t->block_info.closure;
3345
3346         for (frame = t->su; ; frame = frame->link) {
3347             switch (get_itbl(frame)->type) {
3348
3349             case UPDATE_FRAME:
3350                 if (frame->updatee == blocked_on) {
3351                     /* We are blocking on one of our own computations, so
3352                      * send this thread the NonTermination exception.  
3353                      */
3354                     IF_DEBUG(scheduler, 
3355                              sched_belch("thread %d is blocked on itself", t->id));
3356                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3357                     goto done;
3358                 }
3359                 else {
3360                     continue;
3361                 }
3362
3363             case CATCH_FRAME:
3364             case SEQ_FRAME:
3365                 continue;
3366                 
3367             case STOP_FRAME:
3368                 break;
3369             }
3370             break;
3371         }
3372
3373     done: ;
3374     }   
3375 }
3376
3377 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3378 //@subsection Debugging Routines
3379
3380 /* -----------------------------------------------------------------------------
3381    Debugging: why is a thread blocked
3382    -------------------------------------------------------------------------- */
3383
3384 #ifdef DEBUG
3385
3386 void
3387 printThreadBlockage(StgTSO *tso)
3388 {
3389   switch (tso->why_blocked) {
3390   case BlockedOnRead:
3391     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3392     break;
3393   case BlockedOnWrite:
3394     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3395     break;
3396   case BlockedOnDelay:
3397     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3398     break;
3399   case BlockedOnMVar:
3400     fprintf(stderr,"is blocked on an MVar");
3401     break;
3402   case BlockedOnException:
3403     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3404             tso->block_info.tso->id);
3405     break;
3406   case BlockedOnBlackHole:
3407     fprintf(stderr,"is blocked on a black hole");
3408     break;
3409   case NotBlocked:
3410     fprintf(stderr,"is not blocked");
3411     break;
3412 #if defined(PAR)
3413   case BlockedOnGA:
3414     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3415             tso->block_info.closure, info_type(tso->block_info.closure));
3416     break;
3417   case BlockedOnGA_NoSend:
3418     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3419             tso->block_info.closure, info_type(tso->block_info.closure));
3420     break;
3421 #endif
3422   default:
3423     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3424          tso->why_blocked, tso->id, tso);
3425   }
3426 }
3427
3428 void
3429 printThreadStatus(StgTSO *tso)
3430 {
3431   switch (tso->what_next) {
3432   case ThreadKilled:
3433     fprintf(stderr,"has been killed");
3434     break;
3435   case ThreadComplete:
3436     fprintf(stderr,"has completed");
3437     break;
3438   default:
3439     printThreadBlockage(tso);
3440   }
3441 }
3442
3443 void
3444 printAllThreads(void)
3445 {
3446   StgTSO *t;
3447
3448 # if defined(GRAN)
3449   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3450   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3451                        time_string, rtsFalse/*no commas!*/);
3452
3453   sched_belch("all threads at [%s]:", time_string);
3454 # elif defined(PAR)
3455   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3456   ullong_format_string(CURRENT_TIME,
3457                        time_string, rtsFalse/*no commas!*/);
3458
3459   sched_belch("all threads at [%s]:", time_string);
3460 # else
3461   sched_belch("all threads:");
3462 # endif
3463
3464   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3465     fprintf(stderr, "\tthread %d ", t->id);
3466     printThreadStatus(t);
3467     fprintf(stderr,"\n");
3468   }
3469 }
3470     
3471 /* 
3472    Print a whole blocking queue attached to node (debugging only).
3473 */
3474 //@cindex print_bq
3475 # if defined(PAR)
3476 void 
3477 print_bq (StgClosure *node)
3478 {
3479   StgBlockingQueueElement *bqe;
3480   StgTSO *tso;
3481   rtsBool end;
3482
3483   fprintf(stderr,"## BQ of closure %p (%s): ",
3484           node, info_type(node));
3485
3486   /* should cover all closures that may have a blocking queue */
3487   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3488          get_itbl(node)->type == FETCH_ME_BQ ||
3489          get_itbl(node)->type == RBH ||
3490          get_itbl(node)->type == MVAR);
3491     
3492   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3493
3494   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3495 }
3496
3497 /* 
3498    Print a whole blocking queue starting with the element bqe.
3499 */
3500 void 
3501 print_bqe (StgBlockingQueueElement *bqe)
3502 {
3503   rtsBool end;
3504
3505   /* 
3506      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3507   */
3508   for (end = (bqe==END_BQ_QUEUE);
3509        !end; // iterate until bqe points to a CONSTR
3510        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3511        bqe = end ? END_BQ_QUEUE : bqe->link) {
3512     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3513     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3514     /* types of closures that may appear in a blocking queue */
3515     ASSERT(get_itbl(bqe)->type == TSO ||           
3516            get_itbl(bqe)->type == BLOCKED_FETCH || 
3517            get_itbl(bqe)->type == CONSTR); 
3518     /* only BQs of an RBH end with an RBH_Save closure */
3519     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3520
3521     switch (get_itbl(bqe)->type) {
3522     case TSO:
3523       fprintf(stderr," TSO %u (%x),",
3524               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3525       break;
3526     case BLOCKED_FETCH:
3527       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3528               ((StgBlockedFetch *)bqe)->node, 
3529               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3530               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3531               ((StgBlockedFetch *)bqe)->ga.weight);
3532       break;
3533     case CONSTR:
3534       fprintf(stderr," %s (IP %p),",
3535               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3536                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3537                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3538                "RBH_Save_?"), get_itbl(bqe));
3539       break;
3540     default:
3541       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3542            info_type((StgClosure *)bqe)); // , node, info_type(node));
3543       break;
3544     }
3545   } /* for */
3546   fputc('\n', stderr);
3547 }
3548 # elif defined(GRAN)
3549 void 
3550 print_bq (StgClosure *node)
3551 {
3552   StgBlockingQueueElement *bqe;
3553   PEs node_loc, tso_loc;
3554   rtsBool end;
3555
3556   /* should cover all closures that may have a blocking queue */
3557   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3558          get_itbl(node)->type == FETCH_ME_BQ ||
3559          get_itbl(node)->type == RBH);
3560     
3561   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3562   node_loc = where_is(node);
3563
3564   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3565           node, info_type(node), node_loc);
3566
3567   /* 
3568      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3569   */
3570   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3571        !end; // iterate until bqe points to a CONSTR
3572        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3573     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3574     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3575     /* types of closures that may appear in a blocking queue */
3576     ASSERT(get_itbl(bqe)->type == TSO ||           
3577            get_itbl(bqe)->type == CONSTR); 
3578     /* only BQs of an RBH end with an RBH_Save closure */
3579     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3580
3581     tso_loc = where_is((StgClosure *)bqe);
3582     switch (get_itbl(bqe)->type) {
3583     case TSO:
3584       fprintf(stderr," TSO %d (%p) on [PE %d],",
3585               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3586       break;
3587     case CONSTR:
3588       fprintf(stderr," %s (IP %p),",
3589               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3590                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3591                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3592                "RBH_Save_?"), get_itbl(bqe));
3593       break;
3594     default:
3595       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3596            info_type((StgClosure *)bqe), node, info_type(node));
3597       break;
3598     }
3599   } /* for */
3600   fputc('\n', stderr);
3601 }
3602 #else
3603 /* 
3604    Nice and easy: only TSOs on the blocking queue
3605 */
3606 void 
3607 print_bq (StgClosure *node)
3608 {
3609   StgTSO *tso;
3610
3611   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3612   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3613        tso != END_TSO_QUEUE; 
3614        tso=tso->link) {
3615     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3616     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3617     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3618   }
3619   fputc('\n', stderr);
3620 }
3621 # endif
3622
3623 #if defined(PAR)
3624 static nat
3625 run_queue_len(void)
3626 {
3627   nat i;
3628   StgTSO *tso;
3629
3630   for (i=0, tso=run_queue_hd; 
3631        tso != END_TSO_QUEUE;
3632        i++, tso=tso->link)
3633     /* nothing */
3634
3635   return i;
3636 }
3637 #endif
3638
3639 static void
3640 sched_belch(char *s, ...)
3641 {
3642   va_list ap;
3643   va_start(ap,s);
3644 #ifdef SMP
3645   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3646 #elif defined(PAR)
3647   fprintf(stderr, "== ");
3648 #else
3649   fprintf(stderr, "scheduler: ");
3650 #endif
3651   vfprintf(stderr, s, ap);
3652   fprintf(stderr, "\n");
3653 }
3654
3655 #endif /* DEBUG */
3656
3657
3658 //@node Index,  , Debugging Routines, Main scheduling code
3659 //@subsection Index
3660
3661 //@index
3662 //* MainRegTable::  @cindex\s-+MainRegTable
3663 //* StgMainThread::  @cindex\s-+StgMainThread
3664 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3665 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3666 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3667 //* context_switch::  @cindex\s-+context_switch
3668 //* createThread::  @cindex\s-+createThread
3669 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3670 //* initScheduler::  @cindex\s-+initScheduler
3671 //* interrupted::  @cindex\s-+interrupted
3672 //* next_thread_id::  @cindex\s-+next_thread_id
3673 //* print_bq::  @cindex\s-+print_bq
3674 //* run_queue_hd::  @cindex\s-+run_queue_hd
3675 //* run_queue_tl::  @cindex\s-+run_queue_tl
3676 //* sched_mutex::  @cindex\s-+sched_mutex
3677 //* schedule::  @cindex\s-+schedule
3678 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3679 //* term_mutex::  @cindex\s-+term_mutex
3680 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3681 //@end index