[project @ 2002-04-23 06:34:26 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.138 2002/04/23 06:34:27 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #ifdef HAVE_SYS_TYPES_H
118 #include <sys/types.h>
119 #endif
120 #ifdef HAVE_UNISTD_H
121 #include <unistd.h>
122 #endif
123
124 #include <stdarg.h>
125
126 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
127 //@subsection Variables and Data structures
128
129 /* Main thread queue.
130  * Locks required: sched_mutex.
131  */
132 StgMainThread *main_threads;
133
134 /* Thread queues.
135  * Locks required: sched_mutex.
136  */
137 #if defined(GRAN)
138
139 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
140 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
141
142 /* 
143    In GranSim we have a runnable and a blocked queue for each processor.
144    In order to minimise code changes new arrays run_queue_hds/tls
145    are created. run_queue_hd is then a short cut (macro) for
146    run_queue_hds[CurrentProc] (see GranSim.h).
147    -- HWL
148 */
149 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
150 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
151 StgTSO *ccalling_threadss[MAX_PROC];
152 /* We use the same global list of threads (all_threads) in GranSim as in
153    the std RTS (i.e. we are cheating). However, we don't use this list in
154    the GranSim specific code at the moment (so we are only potentially
155    cheating).  */
156
157 #else /* !GRAN */
158
159 StgTSO *run_queue_hd, *run_queue_tl;
160 StgTSO *blocked_queue_hd, *blocked_queue_tl;
161 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
162
163 #endif
164
165 /* Linked list of all threads.
166  * Used for detecting garbage collected threads.
167  */
168 StgTSO *all_threads;
169
170 /* When a thread performs a safe C call (_ccall_GC, using old
171  * terminology), it gets put on the suspended_ccalling_threads
172  * list. Used by the garbage collector.
173  */
174 static StgTSO *suspended_ccalling_threads;
175
176 static StgTSO *threadStackOverflow(StgTSO *tso);
177
178 /* KH: The following two flags are shared memory locations.  There is no need
179        to lock them, since they are only unset at the end of a scheduler
180        operation.
181 */
182
183 /* flag set by signal handler to precipitate a context switch */
184 //@cindex context_switch
185 nat context_switch;
186
187 /* if this flag is set as well, give up execution */
188 //@cindex interrupted
189 rtsBool interrupted;
190
191 /* Next thread ID to allocate.
192  * Locks required: sched_mutex
193  */
194 //@cindex next_thread_id
195 StgThreadID next_thread_id = 1;
196
197 /*
198  * Pointers to the state of the current thread.
199  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
200  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
201  */
202  
203 /* The smallest stack size that makes any sense is:
204  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
205  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
206  *  + 1                       (the realworld token for an IO thread)
207  *  + 1                       (the closure to enter)
208  *
209  * A thread with this stack will bomb immediately with a stack
210  * overflow, which will increase its stack size.  
211  */
212
213 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
214
215
216 #if defined(GRAN)
217 StgTSO *CurrentTSO;
218 #endif
219
220 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
221  *  exists - earlier gccs apparently didn't.
222  *  -= chak
223  */
224 StgTSO dummy_tso;
225
226 rtsBool ready_to_gc;
227
228 void            addToBlockedQueue ( StgTSO *tso );
229
230 static void     schedule          ( void );
231        void     interruptStgRts   ( void );
232 #if defined(GRAN)
233 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
234 #else
235 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
236 #endif
237
238 static void     detectBlackHoles  ( void );
239
240 #ifdef DEBUG
241 static void sched_belch(char *s, ...);
242 #endif
243
244 #if defined(RTS_SUPPORTS_THREADS)
245 /* ToDo: carefully document the invariants that go together
246  *       with these synchronisation objects.
247  */
248 Mutex     sched_mutex       = INIT_MUTEX_VAR;
249 Mutex     term_mutex        = INIT_MUTEX_VAR;
250
251 # if defined(SMP)
252 static Condition gc_pending_cond = INIT_COND_VAR;
253 nat await_death;
254 # endif
255
256 #endif /* RTS_SUPPORTS_THREADS */
257
258 #if defined(PAR)
259 StgTSO *LastTSO;
260 rtsTime TimeOfLastYield;
261 rtsBool emitSchedule = rtsTrue;
262 #endif
263
264 #if DEBUG
265 char *whatNext_strs[] = {
266   "ThreadEnterGHC",
267   "ThreadRunGHC",
268   "ThreadEnterInterp",
269   "ThreadKilled",
270   "ThreadComplete"
271 };
272
273 char *threadReturnCode_strs[] = {
274   "HeapOverflow",                       /* might also be StackOverflow */
275   "StackOverflow",
276   "ThreadYielding",
277   "ThreadBlocked",
278   "ThreadFinished"
279 };
280 #endif
281
282 #if defined(PAR)
283 StgTSO * createSparkThread(rtsSpark spark);
284 StgTSO * activateSpark (rtsSpark spark);  
285 #endif
286
287 /*
288  * The thread state for the main thread.
289 // ToDo: check whether not needed any more
290 StgTSO   *MainTSO;
291  */
292
293 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
294 static void taskStart(void);
295 static void
296 taskStart(void)
297 {
298   schedule();
299 }
300 #endif
301
302
303
304
305 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
306 //@subsection Main scheduling loop
307
308 /* ---------------------------------------------------------------------------
309    Main scheduling loop.
310
311    We use round-robin scheduling, each thread returning to the
312    scheduler loop when one of these conditions is detected:
313
314       * out of heap space
315       * timer expires (thread yields)
316       * thread blocks
317       * thread ends
318       * stack overflow
319
320    Locking notes:  we acquire the scheduler lock once at the beginning
321    of the scheduler loop, and release it when
322     
323       * running a thread, or
324       * waiting for work, or
325       * waiting for a GC to complete.
326
327    GRAN version:
328      In a GranSim setup this loop iterates over the global event queue.
329      This revolves around the global event queue, which determines what 
330      to do next. Therefore, it's more complicated than either the 
331      concurrent or the parallel (GUM) setup.
332
333    GUM version:
334      GUM iterates over incoming messages.
335      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
336      and sends out a fish whenever it has nothing to do; in-between
337      doing the actual reductions (shared code below) it processes the
338      incoming messages and deals with delayed operations 
339      (see PendingFetches).
340      This is not the ugliest code you could imagine, but it's bloody close.
341
342    ------------------------------------------------------------------------ */
343 //@cindex schedule
344 static void
345 schedule( void )
346 {
347   StgTSO *t;
348   Capability *cap;
349   StgThreadReturnCode ret;
350 #if defined(GRAN)
351   rtsEvent *event;
352 #elif defined(PAR)
353   StgSparkPool *pool;
354   rtsSpark spark;
355   StgTSO *tso;
356   GlobalTaskId pe;
357   rtsBool receivedFinish = rtsFalse;
358 # if defined(DEBUG)
359   nat tp_size, sp_size; // stats only
360 # endif
361 #endif
362   rtsBool was_interrupted = rtsFalse;
363   
364   ACQUIRE_LOCK(&sched_mutex);
365  
366 #if defined(RTS_SUPPORTS_THREADS)
367   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
368 #else
369   /* simply initialise it in the non-threaded case */
370   grabCapability(&cap);
371 #endif
372
373 #if defined(GRAN)
374   /* set up first event to get things going */
375   /* ToDo: assign costs for system setup and init MainTSO ! */
376   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
377             ContinueThread, 
378             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
379
380   IF_DEBUG(gran,
381            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
382            G_TSO(CurrentTSO, 5));
383
384   if (RtsFlags.GranFlags.Light) {
385     /* Save current time; GranSim Light only */
386     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
387   }      
388
389   event = get_next_event();
390
391   while (event!=(rtsEvent*)NULL) {
392     /* Choose the processor with the next event */
393     CurrentProc = event->proc;
394     CurrentTSO = event->tso;
395
396 #elif defined(PAR)
397
398   while (!receivedFinish) {    /* set by processMessages */
399                                /* when receiving PP_FINISH message         */ 
400 #else
401
402   while (1) {
403
404 #endif
405
406     IF_DEBUG(scheduler, printAllThreads());
407
408 #if defined(RTS_SUPPORTS_THREADS)
409     /* Check to see whether there are any worker threads
410        waiting to deposit external call results. If so,
411        yield our capability */
412     yieldToReturningWorker(&sched_mutex, &cap);
413 #endif
414
415     /* If we're interrupted (the user pressed ^C, or some other
416      * termination condition occurred), kill all the currently running
417      * threads.
418      */
419     if (interrupted) {
420       IF_DEBUG(scheduler, sched_belch("interrupted"));
421       deleteAllThreads();
422       interrupted = rtsFalse;
423       was_interrupted = rtsTrue;
424     }
425
426     /* Go through the list of main threads and wake up any
427      * clients whose computations have finished.  ToDo: this
428      * should be done more efficiently without a linear scan
429      * of the main threads list, somehow...
430      */
431 #if defined(RTS_SUPPORTS_THREADS)
432     { 
433       StgMainThread *m, **prev;
434       prev = &main_threads;
435       for (m = main_threads; m != NULL; m = m->link) {
436         switch (m->tso->what_next) {
437         case ThreadComplete:
438           if (m->ret) {
439             *(m->ret) = (StgClosure *)m->tso->sp[0];
440           }
441           *prev = m->link;
442           m->stat = Success;
443           broadcastCondition(&m->wakeup);
444 #ifdef DEBUG
445           free(m->tso->label);
446 #endif
447           break;
448         case ThreadKilled:
449           if (m->ret) *(m->ret) = NULL;
450           *prev = m->link;
451           if (was_interrupted) {
452             m->stat = Interrupted;
453           } else {
454             m->stat = Killed;
455           }
456           broadcastCondition(&m->wakeup);
457 #ifdef DEBUG
458           free(m->tso->label);
459 #endif
460           break;
461         default:
462           break;
463         }
464       }
465     }
466
467 #else /* not threaded */
468
469 # if defined(PAR)
470     /* in GUM do this only on the Main PE */
471     if (IAmMainThread)
472 # endif
473     /* If our main thread has finished or been killed, return.
474      */
475     {
476       StgMainThread *m = main_threads;
477       if (m->tso->what_next == ThreadComplete
478           || m->tso->what_next == ThreadKilled) {
479 #ifdef DEBUG
480         free(m->tso->label);
481 #endif
482         main_threads = main_threads->link;
483         if (m->tso->what_next == ThreadComplete) {
484           /* we finished successfully, fill in the return value */
485           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
486           m->stat = Success;
487           return;
488         } else {
489           if (m->ret) { *(m->ret) = NULL; };
490           if (was_interrupted) {
491             m->stat = Interrupted;
492           } else {
493             m->stat = Killed;
494           }
495           return;
496         }
497       }
498     }
499 #endif
500
501     /* Top up the run queue from our spark pool.  We try to make the
502      * number of threads in the run queue equal to the number of
503      * free capabilities.
504      *
505      * Disable spark support in SMP for now, non-essential & requires
506      * a little bit of work to make it compile cleanly. -- sof 1/02.
507      */
508 #if 0 /* defined(SMP) */
509     {
510       nat n = getFreeCapabilities();
511       StgTSO *tso = run_queue_hd;
512
513       /* Count the run queue */
514       while (n > 0 && tso != END_TSO_QUEUE) {
515         tso = tso->link;
516         n--;
517       }
518
519       for (; n > 0; n--) {
520         StgClosure *spark;
521         spark = findSpark(rtsFalse);
522         if (spark == NULL) {
523           break; /* no more sparks in the pool */
524         } else {
525           /* I'd prefer this to be done in activateSpark -- HWL */
526           /* tricky - it needs to hold the scheduler lock and
527            * not try to re-acquire it -- SDM */
528           createSparkThread(spark);       
529           IF_DEBUG(scheduler,
530                    sched_belch("==^^ turning spark of closure %p into a thread",
531                                (StgClosure *)spark));
532         }
533       }
534       /* We need to wake up the other tasks if we just created some
535        * work for them.
536        */
537       if (getFreeCapabilities() - n > 1) {
538           signalCondition( &thread_ready_cond );
539       }
540     }
541 #endif // SMP
542
543     /* check for signals each time around the scheduler */
544 #ifndef mingw32_TARGET_OS
545     if (signals_pending()) {
546       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
547       startSignalHandlers();
548       ACQUIRE_LOCK(&sched_mutex);
549     }
550 #endif
551
552     /* Check whether any waiting threads need to be woken up.  If the
553      * run queue is empty, and there are no other tasks running, we
554      * can wait indefinitely for something to happen.
555      * ToDo: what if another client comes along & requests another
556      * main thread?
557      */
558     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
559       awaitEvent( EMPTY_RUN_QUEUE()
560 #if defined(SMP)
561         && allFreeCapabilities()
562 #endif
563         );
564     }
565     /* we can be interrupted while waiting for I/O... */
566     if (interrupted) continue;
567
568     /* 
569      * Detect deadlock: when we have no threads to run, there are no
570      * threads waiting on I/O or sleeping, and all the other tasks are
571      * waiting for work, we must have a deadlock of some description.
572      *
573      * We first try to find threads blocked on themselves (ie. black
574      * holes), and generate NonTermination exceptions where necessary.
575      *
576      * If no threads are black holed, we have a deadlock situation, so
577      * inform all the main threads.
578      */
579 #ifndef PAR
580     if (   EMPTY_THREAD_QUEUES()
581 #if defined(RTS_SUPPORTS_THREADS)
582         && EMPTY_QUEUE(suspended_ccalling_threads)
583 #endif
584 #ifdef SMP
585         && allFreeCapabilities()
586 #endif
587         )
588     {
589         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
590 #if defined(THREADED_RTS)
591         /* and SMP mode ..? */
592         releaseCapability(cap);
593 #endif
594         // Garbage collection can release some new threads due to
595         // either (a) finalizers or (b) threads resurrected because
596         // they are about to be send BlockedOnDeadMVar.  Any threads
597         // thus released will be immediately runnable.
598         GarbageCollect(GetRoots,rtsTrue);
599
600         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
601
602         IF_DEBUG(scheduler, 
603                  sched_belch("still deadlocked, checking for black holes..."));
604         detectBlackHoles();
605
606         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
607
608 #ifndef mingw32_TARGET_OS
609         /* If we have user-installed signal handlers, then wait
610          * for signals to arrive rather then bombing out with a
611          * deadlock.
612          */
613 #if defined(RTS_SUPPORTS_THREADS)
614         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
615                       a signal with no runnable threads (or I/O
616                       suspended ones) leads nowhere quick.
617                       For now, simply shut down when we reach this
618                       condition.
619                       
620                       ToDo: define precisely under what conditions
621                       the Scheduler should shut down in an MT setting.
622                    */
623 #else
624         if ( anyUserHandlers() ) {
625 #endif
626             IF_DEBUG(scheduler, 
627                      sched_belch("still deadlocked, waiting for signals..."));
628
629             awaitUserSignals();
630
631             // we might be interrupted...
632             if (interrupted) { continue; }
633
634             if (signals_pending()) {
635                 RELEASE_LOCK(&sched_mutex);
636                 startSignalHandlers();
637                 ACQUIRE_LOCK(&sched_mutex);
638             }
639             ASSERT(!EMPTY_RUN_QUEUE());
640             goto not_deadlocked;
641         }
642 #endif
643
644         /* Probably a real deadlock.  Send the current main thread the
645          * Deadlock exception (or in the SMP build, send *all* main
646          * threads the deadlock exception, since none of them can make
647          * progress).
648          */
649         {
650             StgMainThread *m;
651 #if defined(RTS_SUPPORTS_THREADS)
652             for (m = main_threads; m != NULL; m = m->link) {
653                 switch (m->tso->why_blocked) {
654                 case BlockedOnBlackHole:
655                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
656                     break;
657                 case BlockedOnException:
658                 case BlockedOnMVar:
659                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
660                     break;
661                 default:
662                     barf("deadlock: main thread blocked in a strange way");
663                 }
664             }
665 #else
666             m = main_threads;
667             switch (m->tso->why_blocked) {
668             case BlockedOnBlackHole:
669                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
670                 break;
671             case BlockedOnException:
672             case BlockedOnMVar:
673                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
674                 break;
675             default:
676                 barf("deadlock: main thread blocked in a strange way");
677             }
678 #endif
679         }
680
681 #if defined(RTS_SUPPORTS_THREADS)
682         /* ToDo: revisit conditions (and mechanism) for shutting
683            down a multi-threaded world  */
684         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
685         shutdownHaskellAndExit(0);
686 #endif
687     }
688   not_deadlocked:
689
690 #elif defined(PAR)
691     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
692 #endif
693
694 #if defined(SMP)
695     /* If there's a GC pending, don't do anything until it has
696      * completed.
697      */
698     if (ready_to_gc) {
699       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
700       waitCondition( &gc_pending_cond, &sched_mutex );
701     }
702 #endif    
703
704 #if defined(RTS_SUPPORTS_THREADS)
705     /* block until we've got a thread on the run queue and a free
706      * capability.
707      *
708      */
709     if ( EMPTY_RUN_QUEUE() ) {
710       /* Give up our capability */
711       releaseCapability(cap);
712       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
713       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
714       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
715 #if 0
716       while ( EMPTY_RUN_QUEUE() ) {
717         waitForWorkCapability(&sched_mutex, &cap);
718         IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
719       }
720 #endif
721     }
722 #endif
723
724 #if defined(GRAN)
725     if (RtsFlags.GranFlags.Light)
726       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
727
728     /* adjust time based on time-stamp */
729     if (event->time > CurrentTime[CurrentProc] &&
730         event->evttype != ContinueThread)
731       CurrentTime[CurrentProc] = event->time;
732     
733     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
734     if (!RtsFlags.GranFlags.Light)
735       handleIdlePEs();
736
737     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
738
739     /* main event dispatcher in GranSim */
740     switch (event->evttype) {
741       /* Should just be continuing execution */
742     case ContinueThread:
743       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
744       /* ToDo: check assertion
745       ASSERT(run_queue_hd != (StgTSO*)NULL &&
746              run_queue_hd != END_TSO_QUEUE);
747       */
748       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
749       if (!RtsFlags.GranFlags.DoAsyncFetch &&
750           procStatus[CurrentProc]==Fetching) {
751         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
752               CurrentTSO->id, CurrentTSO, CurrentProc);
753         goto next_thread;
754       } 
755       /* Ignore ContinueThreads for completed threads */
756       if (CurrentTSO->what_next == ThreadComplete) {
757         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
758               CurrentTSO->id, CurrentTSO, CurrentProc);
759         goto next_thread;
760       } 
761       /* Ignore ContinueThreads for threads that are being migrated */
762       if (PROCS(CurrentTSO)==Nowhere) { 
763         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
764               CurrentTSO->id, CurrentTSO, CurrentProc);
765         goto next_thread;
766       }
767       /* The thread should be at the beginning of the run queue */
768       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
769         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
770               CurrentTSO->id, CurrentTSO, CurrentProc);
771         break; // run the thread anyway
772       }
773       /*
774       new_event(proc, proc, CurrentTime[proc],
775                 FindWork,
776                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
777       goto next_thread; 
778       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
779       break; // now actually run the thread; DaH Qu'vam yImuHbej 
780
781     case FetchNode:
782       do_the_fetchnode(event);
783       goto next_thread;             /* handle next event in event queue  */
784       
785     case GlobalBlock:
786       do_the_globalblock(event);
787       goto next_thread;             /* handle next event in event queue  */
788       
789     case FetchReply:
790       do_the_fetchreply(event);
791       goto next_thread;             /* handle next event in event queue  */
792       
793     case UnblockThread:   /* Move from the blocked queue to the tail of */
794       do_the_unblock(event);
795       goto next_thread;             /* handle next event in event queue  */
796       
797     case ResumeThread:  /* Move from the blocked queue to the tail of */
798       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
799       event->tso->gran.blocktime += 
800         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
801       do_the_startthread(event);
802       goto next_thread;             /* handle next event in event queue  */
803       
804     case StartThread:
805       do_the_startthread(event);
806       goto next_thread;             /* handle next event in event queue  */
807       
808     case MoveThread:
809       do_the_movethread(event);
810       goto next_thread;             /* handle next event in event queue  */
811       
812     case MoveSpark:
813       do_the_movespark(event);
814       goto next_thread;             /* handle next event in event queue  */
815       
816     case FindWork:
817       do_the_findwork(event);
818       goto next_thread;             /* handle next event in event queue  */
819       
820     default:
821       barf("Illegal event type %u\n", event->evttype);
822     }  /* switch */
823     
824     /* This point was scheduler_loop in the old RTS */
825
826     IF_DEBUG(gran, belch("GRAN: after main switch"));
827
828     TimeOfLastEvent = CurrentTime[CurrentProc];
829     TimeOfNextEvent = get_time_of_next_event();
830     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
831     // CurrentTSO = ThreadQueueHd;
832
833     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
834                          TimeOfNextEvent));
835
836     if (RtsFlags.GranFlags.Light) 
837       GranSimLight_leave_system(event, &ActiveTSO); 
838
839     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
840
841     IF_DEBUG(gran, 
842              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
843
844     /* in a GranSim setup the TSO stays on the run queue */
845     t = CurrentTSO;
846     /* Take a thread from the run queue. */
847     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
848
849     IF_DEBUG(gran, 
850              fprintf(stderr, "GRAN: About to run current thread, which is\n");
851              G_TSO(t,5));
852
853     context_switch = 0; // turned on via GranYield, checking events and time slice
854
855     IF_DEBUG(gran, 
856              DumpGranEvent(GR_SCHEDULE, t));
857
858     procStatus[CurrentProc] = Busy;
859
860 #elif defined(PAR)
861     if (PendingFetches != END_BF_QUEUE) {
862         processFetches();
863     }
864
865     /* ToDo: phps merge with spark activation above */
866     /* check whether we have local work and send requests if we have none */
867     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
868       /* :-[  no local threads => look out for local sparks */
869       /* the spark pool for the current PE */
870       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
871       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
872           pool->hd < pool->tl) {
873         /* 
874          * ToDo: add GC code check that we really have enough heap afterwards!!
875          * Old comment:
876          * If we're here (no runnable threads) and we have pending
877          * sparks, we must have a space problem.  Get enough space
878          * to turn one of those pending sparks into a
879          * thread... 
880          */
881
882         spark = findSpark(rtsFalse);                /* get a spark */
883         if (spark != (rtsSpark) NULL) {
884           tso = activateSpark(spark);       /* turn the spark into a thread */
885           IF_PAR_DEBUG(schedule,
886                        belch("==== schedule: Created TSO %d (%p); %d threads active",
887                              tso->id, tso, advisory_thread_count));
888
889           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
890             belch("==^^ failed to activate spark");
891             goto next_thread;
892           }               /* otherwise fall through & pick-up new tso */
893         } else {
894           IF_PAR_DEBUG(verbose,
895                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
896                              spark_queue_len(pool)));
897           goto next_thread;
898         }
899       }
900
901       /* If we still have no work we need to send a FISH to get a spark
902          from another PE 
903       */
904       if (EMPTY_RUN_QUEUE()) {
905       /* =8-[  no local sparks => look for work on other PEs */
906         /*
907          * We really have absolutely no work.  Send out a fish
908          * (there may be some out there already), and wait for
909          * something to arrive.  We clearly can't run any threads
910          * until a SCHEDULE or RESUME arrives, and so that's what
911          * we're hoping to see.  (Of course, we still have to
912          * respond to other types of messages.)
913          */
914         TIME now = msTime() /*CURRENT_TIME*/;
915         IF_PAR_DEBUG(verbose, 
916                      belch("--  now=%ld", now));
917         IF_PAR_DEBUG(verbose,
918                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
919                          (last_fish_arrived_at!=0 &&
920                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
921                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
922                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
923                              last_fish_arrived_at,
924                              RtsFlags.ParFlags.fishDelay, now);
925                      });
926         
927         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
928             (last_fish_arrived_at==0 ||
929              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
930           /* outstandingFishes is set in sendFish, processFish;
931              avoid flooding system with fishes via delay */
932           pe = choosePE();
933           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
934                    NEW_FISH_HUNGER);
935
936           // Global statistics: count no. of fishes
937           if (RtsFlags.ParFlags.ParStats.Global &&
938               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
939             globalParStats.tot_fish_mess++;
940           }
941         }
942       
943         receivedFinish = processMessages();
944         goto next_thread;
945       }
946     } else if (PacketsWaiting()) {  /* Look for incoming messages */
947       receivedFinish = processMessages();
948     }
949
950     /* Now we are sure that we have some work available */
951     ASSERT(run_queue_hd != END_TSO_QUEUE);
952
953     /* Take a thread from the run queue, if we have work */
954     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
955     IF_DEBUG(sanity,checkTSO(t));
956
957     /* ToDo: write something to the log-file
958     if (RTSflags.ParFlags.granSimStats && !sameThread)
959         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
960
961     CurrentTSO = t;
962     */
963     /* the spark pool for the current PE */
964     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
965
966     IF_DEBUG(scheduler, 
967              belch("--=^ %d threads, %d sparks on [%#x]", 
968                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
969
970 # if 1
971     if (0 && RtsFlags.ParFlags.ParStats.Full && 
972         t && LastTSO && t->id != LastTSO->id && 
973         LastTSO->why_blocked == NotBlocked && 
974         LastTSO->what_next != ThreadComplete) {
975       // if previously scheduled TSO not blocked we have to record the context switch
976       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
977                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
978     }
979
980     if (RtsFlags.ParFlags.ParStats.Full && 
981         (emitSchedule /* forced emit */ ||
982         (t && LastTSO && t->id != LastTSO->id))) {
983       /* 
984          we are running a different TSO, so write a schedule event to log file
985          NB: If we use fair scheduling we also have to write  a deschedule 
986              event for LastTSO; with unfair scheduling we know that the
987              previous tso has blocked whenever we switch to another tso, so
988              we don't need it in GUM for now
989       */
990       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
991                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
992       emitSchedule = rtsFalse;
993     }
994      
995 # endif
996 #else /* !GRAN && !PAR */
997   
998     /* grab a thread from the run queue */
999     ASSERT(run_queue_hd != END_TSO_QUEUE);
1000     t = POP_RUN_QUEUE();
1001     // Sanity check the thread we're about to run.  This can be
1002     // expensive if there is lots of thread switching going on...
1003     IF_DEBUG(sanity,checkTSO(t));
1004 #endif
1005     
1006     cap->r.rCurrentTSO = t;
1007     
1008     /* context switches are now initiated by the timer signal, unless
1009      * the user specified "context switch as often as possible", with
1010      * +RTS -C0
1011      */
1012     if (
1013 #ifdef PROFILING
1014         RtsFlags.ProfFlags.profileInterval == 0 ||
1015 #endif
1016         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1017          && (run_queue_hd != END_TSO_QUEUE
1018              || blocked_queue_hd != END_TSO_QUEUE
1019              || sleeping_queue != END_TSO_QUEUE)))
1020         context_switch = 1;
1021     else
1022         context_switch = 0;
1023
1024     RELEASE_LOCK(&sched_mutex);
1025
1026     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1027                               t->id, t, whatNext_strs[t->what_next]));
1028
1029 #ifdef PROFILING
1030     startHeapProfTimer();
1031 #endif
1032
1033     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1034     /* Run the current thread 
1035      */
1036     switch (cap->r.rCurrentTSO->what_next) {
1037     case ThreadKilled:
1038     case ThreadComplete:
1039         /* Thread already finished, return to scheduler. */
1040         ret = ThreadFinished;
1041         break;
1042     case ThreadEnterGHC:
1043         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1044         break;
1045     case ThreadRunGHC:
1046         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1047         break;
1048     case ThreadEnterInterp:
1049         ret = interpretBCO(cap);
1050         break;
1051     default:
1052       barf("schedule: invalid what_next field");
1053     }
1054     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1055     
1056     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1057 #ifdef PROFILING
1058     stopHeapProfTimer();
1059     CCCS = CCS_SYSTEM;
1060 #endif
1061     
1062     ACQUIRE_LOCK(&sched_mutex);
1063
1064 #ifdef SMP
1065     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1066 #elif !defined(GRAN) && !defined(PAR)
1067     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1068 #endif
1069     t = cap->r.rCurrentTSO;
1070     
1071 #if defined(PAR)
1072     /* HACK 675: if the last thread didn't yield, make sure to print a 
1073        SCHEDULE event to the log file when StgRunning the next thread, even
1074        if it is the same one as before */
1075     LastTSO = t; 
1076     TimeOfLastYield = CURRENT_TIME;
1077 #endif
1078
1079     switch (ret) {
1080     case HeapOverflow:
1081 #if defined(GRAN)
1082       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1083       globalGranStats.tot_heapover++;
1084 #elif defined(PAR)
1085       globalParStats.tot_heapover++;
1086 #endif
1087
1088       // did the task ask for a large block?
1089       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1090           // if so, get one and push it on the front of the nursery.
1091           bdescr *bd;
1092           nat blocks;
1093           
1094           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1095
1096           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1097                                    t->id, t,
1098                                    whatNext_strs[t->what_next], blocks));
1099
1100           // don't do this if it would push us over the
1101           // alloc_blocks_lim limit; we'll GC first.
1102           if (alloc_blocks + blocks < alloc_blocks_lim) {
1103
1104               alloc_blocks += blocks;
1105               bd = allocGroup( blocks );
1106
1107               // link the new group into the list
1108               bd->link = cap->r.rCurrentNursery;
1109               bd->u.back = cap->r.rCurrentNursery->u.back;
1110               if (cap->r.rCurrentNursery->u.back != NULL) {
1111                   cap->r.rCurrentNursery->u.back->link = bd;
1112               } else {
1113                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1114                          g0s0->blocks == cap->r.rNursery);
1115                   cap->r.rNursery = g0s0->blocks = bd;
1116               }           
1117               cap->r.rCurrentNursery->u.back = bd;
1118
1119               // initialise it as a nursery block
1120               bd->step = g0s0;
1121               bd->gen_no = 0;
1122               bd->flags = 0;
1123               bd->free = bd->start;
1124
1125               // don't forget to update the block count in g0s0.
1126               g0s0->n_blocks += blocks;
1127               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1128
1129               // now update the nursery to point to the new block
1130               cap->r.rCurrentNursery = bd;
1131
1132               // we might be unlucky and have another thread get on the
1133               // run queue before us and steal the large block, but in that
1134               // case the thread will just end up requesting another large
1135               // block.
1136               PUSH_ON_RUN_QUEUE(t);
1137               break;
1138           }
1139       }
1140
1141       /* make all the running tasks block on a condition variable,
1142        * maybe set context_switch and wait till they all pile in,
1143        * then have them wait on a GC condition variable.
1144        */
1145       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1146                                t->id, t, whatNext_strs[t->what_next]));
1147       threadPaused(t);
1148 #if defined(GRAN)
1149       ASSERT(!is_on_queue(t,CurrentProc));
1150 #elif defined(PAR)
1151       /* Currently we emit a DESCHEDULE event before GC in GUM.
1152          ToDo: either add separate event to distinguish SYSTEM time from rest
1153                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1154       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1155         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1156                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1157         emitSchedule = rtsTrue;
1158       }
1159 #endif
1160       
1161       ready_to_gc = rtsTrue;
1162       context_switch = 1;               /* stop other threads ASAP */
1163       PUSH_ON_RUN_QUEUE(t);
1164       /* actual GC is done at the end of the while loop */
1165       break;
1166       
1167     case StackOverflow:
1168 #if defined(GRAN)
1169       IF_DEBUG(gran, 
1170                DumpGranEvent(GR_DESCHEDULE, t));
1171       globalGranStats.tot_stackover++;
1172 #elif defined(PAR)
1173       // IF_DEBUG(par, 
1174       // DumpGranEvent(GR_DESCHEDULE, t);
1175       globalParStats.tot_stackover++;
1176 #endif
1177       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1178                                t->id, t, whatNext_strs[t->what_next]));
1179       /* just adjust the stack for this thread, then pop it back
1180        * on the run queue.
1181        */
1182       threadPaused(t);
1183       { 
1184         StgMainThread *m;
1185         /* enlarge the stack */
1186         StgTSO *new_t = threadStackOverflow(t);
1187         
1188         /* This TSO has moved, so update any pointers to it from the
1189          * main thread stack.  It better not be on any other queues...
1190          * (it shouldn't be).
1191          */
1192         for (m = main_threads; m != NULL; m = m->link) {
1193           if (m->tso == t) {
1194             m->tso = new_t;
1195           }
1196         }
1197         threadPaused(new_t);
1198         PUSH_ON_RUN_QUEUE(new_t);
1199       }
1200       break;
1201
1202     case ThreadYielding:
1203 #if defined(GRAN)
1204       IF_DEBUG(gran, 
1205                DumpGranEvent(GR_DESCHEDULE, t));
1206       globalGranStats.tot_yields++;
1207 #elif defined(PAR)
1208       // IF_DEBUG(par, 
1209       // DumpGranEvent(GR_DESCHEDULE, t);
1210       globalParStats.tot_yields++;
1211 #endif
1212       /* put the thread back on the run queue.  Then, if we're ready to
1213        * GC, check whether this is the last task to stop.  If so, wake
1214        * up the GC thread.  getThread will block during a GC until the
1215        * GC is finished.
1216        */
1217       IF_DEBUG(scheduler,
1218                if (t->what_next == ThreadEnterInterp) {
1219                    /* ToDo: or maybe a timer expired when we were in Hugs?
1220                     * or maybe someone hit ctrl-C
1221                     */
1222                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1223                          t->id, t, whatNext_strs[t->what_next]);
1224                } else {
1225                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1226                          t->id, t, whatNext_strs[t->what_next]);
1227                }
1228                );
1229
1230       threadPaused(t);
1231
1232       IF_DEBUG(sanity,
1233                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1234                checkTSO(t));
1235       ASSERT(t->link == END_TSO_QUEUE);
1236 #if defined(GRAN)
1237       ASSERT(!is_on_queue(t,CurrentProc));
1238
1239       IF_DEBUG(sanity,
1240                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1241                checkThreadQsSanity(rtsTrue));
1242 #endif
1243 #if defined(PAR)
1244       if (RtsFlags.ParFlags.doFairScheduling) { 
1245         /* this does round-robin scheduling; good for concurrency */
1246         APPEND_TO_RUN_QUEUE(t);
1247       } else {
1248         /* this does unfair scheduling; good for parallelism */
1249         PUSH_ON_RUN_QUEUE(t);
1250       }
1251 #else
1252       /* this does round-robin scheduling; good for concurrency */
1253       APPEND_TO_RUN_QUEUE(t);
1254 #endif
1255 #if defined(GRAN)
1256       /* add a ContinueThread event to actually process the thread */
1257       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1258                 ContinueThread,
1259                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1260       IF_GRAN_DEBUG(bq, 
1261                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1262                G_EVENTQ(0);
1263                G_CURR_THREADQ(0));
1264 #endif /* GRAN */
1265       break;
1266       
1267     case ThreadBlocked:
1268 #if defined(GRAN)
1269       IF_DEBUG(scheduler,
1270                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1271                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1272                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1273
1274       // ??? needed; should emit block before
1275       IF_DEBUG(gran, 
1276                DumpGranEvent(GR_DESCHEDULE, t)); 
1277       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1278       /*
1279         ngoq Dogh!
1280       ASSERT(procStatus[CurrentProc]==Busy || 
1281               ((procStatus[CurrentProc]==Fetching) && 
1282               (t->block_info.closure!=(StgClosure*)NULL)));
1283       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1284           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1285             procStatus[CurrentProc]==Fetching)) 
1286         procStatus[CurrentProc] = Idle;
1287       */
1288 #elif defined(PAR)
1289       IF_DEBUG(scheduler,
1290                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1291                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1292       IF_PAR_DEBUG(bq,
1293
1294                    if (t->block_info.closure!=(StgClosure*)NULL) 
1295                      print_bq(t->block_info.closure));
1296
1297       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1298       blockThread(t);
1299
1300       /* whatever we schedule next, we must log that schedule */
1301       emitSchedule = rtsTrue;
1302
1303 #else /* !GRAN */
1304       /* don't need to do anything.  Either the thread is blocked on
1305        * I/O, in which case we'll have called addToBlockedQueue
1306        * previously, or it's blocked on an MVar or Blackhole, in which
1307        * case it'll be on the relevant queue already.
1308        */
1309       IF_DEBUG(scheduler,
1310                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1311                printThreadBlockage(t);
1312                fprintf(stderr, "\n"));
1313
1314       /* Only for dumping event to log file 
1315          ToDo: do I need this in GranSim, too?
1316       blockThread(t);
1317       */
1318 #endif
1319       threadPaused(t);
1320       break;
1321       
1322     case ThreadFinished:
1323       /* Need to check whether this was a main thread, and if so, signal
1324        * the task that started it with the return value.  If we have no
1325        * more main threads, we probably need to stop all the tasks until
1326        * we get a new one.
1327        */
1328       /* We also end up here if the thread kills itself with an
1329        * uncaught exception, see Exception.hc.
1330        */
1331       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1332 #if defined(GRAN)
1333       endThread(t, CurrentProc); // clean-up the thread
1334 #elif defined(PAR)
1335       /* For now all are advisory -- HWL */
1336       //if(t->priority==AdvisoryPriority) ??
1337       advisory_thread_count--;
1338       
1339 # ifdef DIST
1340       if(t->dist.priority==RevalPriority)
1341         FinishReval(t);
1342 # endif
1343       
1344       if (RtsFlags.ParFlags.ParStats.Full &&
1345           !RtsFlags.ParFlags.ParStats.Suppressed) 
1346         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1347 #endif
1348       break;
1349       
1350     default:
1351       barf("schedule: invalid thread return code %d", (int)ret);
1352     }
1353
1354 #ifdef PROFILING
1355     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1356         GarbageCollect(GetRoots, rtsTrue);
1357         heapCensus();
1358         performHeapProfile = rtsFalse;
1359         ready_to_gc = rtsFalse; // we already GC'd
1360     }
1361 #endif
1362
1363     if (ready_to_gc 
1364 #ifdef SMP
1365         && allFreeCapabilities() 
1366 #endif
1367         ) {
1368       /* everybody back, start the GC.
1369        * Could do it in this thread, or signal a condition var
1370        * to do it in another thread.  Either way, we need to
1371        * broadcast on gc_pending_cond afterward.
1372        */
1373 #if defined(RTS_SUPPORTS_THREADS)
1374       IF_DEBUG(scheduler,sched_belch("doing GC"));
1375 #endif
1376       GarbageCollect(GetRoots,rtsFalse);
1377       ready_to_gc = rtsFalse;
1378 #ifdef SMP
1379       broadcastCondition(&gc_pending_cond);
1380 #endif
1381 #if defined(GRAN)
1382       /* add a ContinueThread event to continue execution of current thread */
1383       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1384                 ContinueThread,
1385                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1386       IF_GRAN_DEBUG(bq, 
1387                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1388                G_EVENTQ(0);
1389                G_CURR_THREADQ(0));
1390 #endif /* GRAN */
1391     }
1392
1393 #if defined(GRAN)
1394   next_thread:
1395     IF_GRAN_DEBUG(unused,
1396                   print_eventq(EventHd));
1397
1398     event = get_next_event();
1399 #elif defined(PAR)
1400   next_thread:
1401     /* ToDo: wait for next message to arrive rather than busy wait */
1402 #endif /* GRAN */
1403
1404   } /* end of while(1) */
1405
1406   IF_PAR_DEBUG(verbose,
1407                belch("== Leaving schedule() after having received Finish"));
1408 }
1409
1410 /* ---------------------------------------------------------------------------
1411  * Singleton fork(). Do not copy any running threads.
1412  * ------------------------------------------------------------------------- */
1413
1414 StgInt forkProcess(StgTSO* tso) {
1415
1416 #ifndef mingw32_TARGET_OS
1417   pid_t pid;
1418   StgTSO* t,*next;
1419
1420   IF_DEBUG(scheduler,sched_belch("forking!"));
1421
1422   pid = fork();
1423   if (pid) { /* parent */
1424
1425   /* just return the pid */
1426     
1427   } else { /* child */
1428   /* wipe all other threads */
1429   run_queue_hd = tso;
1430   tso->link = END_TSO_QUEUE;
1431
1432   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1433      us is picky about finding the threat still in its queue when
1434      handling the deleteThread() */
1435
1436   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1437     next = t->link;
1438     if (t->id != tso->id) {
1439       deleteThread(t);
1440     }
1441   }
1442   }
1443   return pid;
1444 #else /* mingw32 */
1445   barf("forkProcess#: primop not implemented for mingw32, sorry!");
1446   return -1;
1447 #endif /* mingw32 */
1448 }
1449
1450 /* ---------------------------------------------------------------------------
1451  * deleteAllThreads():  kill all the live threads.
1452  *
1453  * This is used when we catch a user interrupt (^C), before performing
1454  * any necessary cleanups and running finalizers.
1455  *
1456  * Locks: sched_mutex held.
1457  * ------------------------------------------------------------------------- */
1458    
1459 void deleteAllThreads ( void )
1460 {
1461   StgTSO* t, *next;
1462   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1463   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1464       next = t->global_link;
1465       deleteThread(t);
1466   }      
1467   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1468   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1469   sleeping_queue = END_TSO_QUEUE;
1470 }
1471
1472 /* startThread and  insertThread are now in GranSim.c -- HWL */
1473
1474
1475 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1476 //@subsection Suspend and Resume
1477
1478 /* ---------------------------------------------------------------------------
1479  * Suspending & resuming Haskell threads.
1480  * 
1481  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1482  * its capability before calling the C function.  This allows another
1483  * task to pick up the capability and carry on running Haskell
1484  * threads.  It also means that if the C call blocks, it won't lock
1485  * the whole system.
1486  *
1487  * The Haskell thread making the C call is put to sleep for the
1488  * duration of the call, on the susepended_ccalling_threads queue.  We
1489  * give out a token to the task, which it can use to resume the thread
1490  * on return from the C function.
1491  * ------------------------------------------------------------------------- */
1492    
1493 StgInt
1494 suspendThread( StgRegTable *reg, 
1495                rtsBool concCall
1496 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1497                STG_UNUSED
1498 #endif
1499                )
1500 {
1501   nat tok;
1502   Capability *cap;
1503
1504   /* assume that *reg is a pointer to the StgRegTable part
1505    * of a Capability.
1506    */
1507   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1508
1509   ACQUIRE_LOCK(&sched_mutex);
1510
1511   IF_DEBUG(scheduler,
1512            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1513
1514   threadPaused(cap->r.rCurrentTSO);
1515   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1516   suspended_ccalling_threads = cap->r.rCurrentTSO;
1517
1518 #if defined(RTS_SUPPORTS_THREADS)
1519   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1520 #endif
1521
1522   /* Use the thread ID as the token; it should be unique */
1523   tok = cap->r.rCurrentTSO->id;
1524
1525   /* Hand back capability */
1526   releaseCapability(cap);
1527   
1528 #if defined(RTS_SUPPORTS_THREADS)
1529   /* Preparing to leave the RTS, so ensure there's a native thread/task
1530      waiting to take over.
1531      
1532      ToDo: optimise this and only create a new task if there's a need
1533      for one (i.e., if there's only one Concurrent Haskell thread alive,
1534      there's no need to create a new task).
1535   */
1536   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1537   if (concCall) {
1538     startTask(taskStart);
1539   }
1540 #endif
1541
1542   /* Other threads _might_ be available for execution; signal this */
1543   THREAD_RUNNABLE();
1544   RELEASE_LOCK(&sched_mutex);
1545   return tok; 
1546 }
1547
1548 StgRegTable *
1549 resumeThread( StgInt tok,
1550               rtsBool concCall
1551 #if !defined(RTS_SUPPORTS_THREADS)
1552                STG_UNUSED
1553 #endif
1554               )
1555 {
1556   StgTSO *tso, **prev;
1557   Capability *cap;
1558
1559 #if defined(RTS_SUPPORTS_THREADS)
1560   /* Wait for permission to re-enter the RTS with the result. */
1561   if ( concCall ) {
1562     ACQUIRE_LOCK(&sched_mutex);
1563     grabReturnCapability(&sched_mutex, &cap);
1564   } else {
1565     grabCapability(&cap);
1566   }
1567 #else
1568   grabCapability(&cap);
1569 #endif
1570
1571   /* Remove the thread off of the suspended list */
1572   prev = &suspended_ccalling_threads;
1573   for (tso = suspended_ccalling_threads; 
1574        tso != END_TSO_QUEUE; 
1575        prev = &tso->link, tso = tso->link) {
1576     if (tso->id == (StgThreadID)tok) {
1577       *prev = tso->link;
1578       break;
1579     }
1580   }
1581   if (tso == END_TSO_QUEUE) {
1582     barf("resumeThread: thread not found");
1583   }
1584   tso->link = END_TSO_QUEUE;
1585   /* Reset blocking status */
1586   tso->why_blocked  = NotBlocked;
1587
1588   cap->r.rCurrentTSO = tso;
1589   RELEASE_LOCK(&sched_mutex);
1590   return &cap->r;
1591 }
1592
1593
1594 /* ---------------------------------------------------------------------------
1595  * Static functions
1596  * ------------------------------------------------------------------------ */
1597 static void unblockThread(StgTSO *tso);
1598
1599 /* ---------------------------------------------------------------------------
1600  * Comparing Thread ids.
1601  *
1602  * This is used from STG land in the implementation of the
1603  * instances of Eq/Ord for ThreadIds.
1604  * ------------------------------------------------------------------------ */
1605
1606 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1607
1608   StgThreadID id1 = tso1->id; 
1609   StgThreadID id2 = tso2->id;
1610  
1611   if (id1 < id2) return (-1);
1612   if (id1 > id2) return 1;
1613   return 0;
1614 }
1615
1616 /* ---------------------------------------------------------------------------
1617  * Fetching the ThreadID from an StgTSO.
1618  *
1619  * This is used in the implementation of Show for ThreadIds.
1620  * ------------------------------------------------------------------------ */
1621 int rts_getThreadId(const StgTSO *tso) 
1622 {
1623   return tso->id;
1624 }
1625
1626 #ifdef DEBUG
1627 void labelThread(StgTSO *tso, char *label)
1628 {
1629   int len;
1630   void *buf;
1631
1632   /* Caveat: Once set, you can only set the thread name to "" */
1633   len = strlen(label)+1;
1634   buf = realloc(tso->label,len);
1635   if (buf == NULL) {
1636     fprintf(stderr,"insufficient memory for labelThread!\n");
1637     free(tso->label);
1638   } else
1639     strncpy(buf,label,len);
1640   tso->label = buf;
1641 }
1642 #endif /* DEBUG */
1643
1644 /* ---------------------------------------------------------------------------
1645    Create a new thread.
1646
1647    The new thread starts with the given stack size.  Before the
1648    scheduler can run, however, this thread needs to have a closure
1649    (and possibly some arguments) pushed on its stack.  See
1650    pushClosure() in Schedule.h.
1651
1652    createGenThread() and createIOThread() (in SchedAPI.h) are
1653    convenient packaged versions of this function.
1654
1655    currently pri (priority) is only used in a GRAN setup -- HWL
1656    ------------------------------------------------------------------------ */
1657 //@cindex createThread
1658 #if defined(GRAN)
1659 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1660 StgTSO *
1661 createThread(nat stack_size, StgInt pri)
1662 {
1663   return createThread_(stack_size, rtsFalse, pri);
1664 }
1665
1666 static StgTSO *
1667 createThread_(nat size, rtsBool have_lock, StgInt pri)
1668 {
1669 #else
1670 StgTSO *
1671 createThread(nat stack_size)
1672 {
1673   return createThread_(stack_size, rtsFalse);
1674 }
1675
1676 static StgTSO *
1677 createThread_(nat size, rtsBool have_lock)
1678 {
1679 #endif
1680
1681     StgTSO *tso;
1682     nat stack_size;
1683
1684     /* First check whether we should create a thread at all */
1685 #if defined(PAR)
1686   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1687   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1688     threadsIgnored++;
1689     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1690           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1691     return END_TSO_QUEUE;
1692   }
1693   threadsCreated++;
1694 #endif
1695
1696 #if defined(GRAN)
1697   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1698 #endif
1699
1700   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1701
1702   /* catch ridiculously small stack sizes */
1703   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1704     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1705   }
1706
1707   stack_size = size - TSO_STRUCT_SIZEW;
1708
1709   tso = (StgTSO *)allocate(size);
1710   TICK_ALLOC_TSO(stack_size, 0);
1711
1712   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1713 #if defined(GRAN)
1714   SET_GRAN_HDR(tso, ThisPE);
1715 #endif
1716   tso->what_next     = ThreadEnterGHC;
1717
1718 #ifdef DEBUG
1719   tso->label = NULL;
1720 #endif
1721
1722   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1723    * protect the increment operation on next_thread_id.
1724    * In future, we could use an atomic increment instead.
1725    */
1726 #ifdef SMP
1727   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1728 #endif
1729   tso->id = next_thread_id++; 
1730 #ifdef SMP
1731   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1732 #endif
1733
1734   tso->why_blocked  = NotBlocked;
1735   tso->blocked_exceptions = NULL;
1736
1737   tso->stack_size   = stack_size;
1738   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1739                               - TSO_STRUCT_SIZEW;
1740   tso->sp           = (P_)&(tso->stack) + stack_size;
1741
1742 #ifdef PROFILING
1743   tso->prof.CCCS = CCS_MAIN;
1744 #endif
1745
1746   /* put a stop frame on the stack */
1747   tso->sp -= sizeofW(StgStopFrame);
1748   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1749   tso->su = (StgUpdateFrame*)tso->sp;
1750
1751   // ToDo: check this
1752 #if defined(GRAN)
1753   tso->link = END_TSO_QUEUE;
1754   /* uses more flexible routine in GranSim */
1755   insertThread(tso, CurrentProc);
1756 #else
1757   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1758    * from its creation
1759    */
1760 #endif
1761
1762 #if defined(GRAN) 
1763   if (RtsFlags.GranFlags.GranSimStats.Full) 
1764     DumpGranEvent(GR_START,tso);
1765 #elif defined(PAR)
1766   if (RtsFlags.ParFlags.ParStats.Full) 
1767     DumpGranEvent(GR_STARTQ,tso);
1768   /* HACk to avoid SCHEDULE 
1769      LastTSO = tso; */
1770 #endif
1771
1772   /* Link the new thread on the global thread list.
1773    */
1774   tso->global_link = all_threads;
1775   all_threads = tso;
1776
1777 #if defined(DIST)
1778   tso->dist.priority = MandatoryPriority; //by default that is...
1779 #endif
1780
1781 #if defined(GRAN)
1782   tso->gran.pri = pri;
1783 # if defined(DEBUG)
1784   tso->gran.magic = TSO_MAGIC; // debugging only
1785 # endif
1786   tso->gran.sparkname   = 0;
1787   tso->gran.startedat   = CURRENT_TIME; 
1788   tso->gran.exported    = 0;
1789   tso->gran.basicblocks = 0;
1790   tso->gran.allocs      = 0;
1791   tso->gran.exectime    = 0;
1792   tso->gran.fetchtime   = 0;
1793   tso->gran.fetchcount  = 0;
1794   tso->gran.blocktime   = 0;
1795   tso->gran.blockcount  = 0;
1796   tso->gran.blockedat   = 0;
1797   tso->gran.globalsparks = 0;
1798   tso->gran.localsparks  = 0;
1799   if (RtsFlags.GranFlags.Light)
1800     tso->gran.clock  = Now; /* local clock */
1801   else
1802     tso->gran.clock  = 0;
1803
1804   IF_DEBUG(gran,printTSO(tso));
1805 #elif defined(PAR)
1806 # if defined(DEBUG)
1807   tso->par.magic = TSO_MAGIC; // debugging only
1808 # endif
1809   tso->par.sparkname   = 0;
1810   tso->par.startedat   = CURRENT_TIME; 
1811   tso->par.exported    = 0;
1812   tso->par.basicblocks = 0;
1813   tso->par.allocs      = 0;
1814   tso->par.exectime    = 0;
1815   tso->par.fetchtime   = 0;
1816   tso->par.fetchcount  = 0;
1817   tso->par.blocktime   = 0;
1818   tso->par.blockcount  = 0;
1819   tso->par.blockedat   = 0;
1820   tso->par.globalsparks = 0;
1821   tso->par.localsparks  = 0;
1822 #endif
1823
1824 #if defined(GRAN)
1825   globalGranStats.tot_threads_created++;
1826   globalGranStats.threads_created_on_PE[CurrentProc]++;
1827   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1828   globalGranStats.tot_sq_probes++;
1829 #elif defined(PAR)
1830   // collect parallel global statistics (currently done together with GC stats)
1831   if (RtsFlags.ParFlags.ParStats.Global &&
1832       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1833     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1834     globalParStats.tot_threads_created++;
1835   }
1836 #endif 
1837
1838 #if defined(GRAN)
1839   IF_GRAN_DEBUG(pri,
1840                 belch("==__ schedule: Created TSO %d (%p);",
1841                       CurrentProc, tso, tso->id));
1842 #elif defined(PAR)
1843     IF_PAR_DEBUG(verbose,
1844                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1845                        tso->id, tso, advisory_thread_count));
1846 #else
1847   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1848                                  tso->id, tso->stack_size));
1849 #endif    
1850   return tso;
1851 }
1852
1853 #if defined(PAR)
1854 /* RFP:
1855    all parallel thread creation calls should fall through the following routine.
1856 */
1857 StgTSO *
1858 createSparkThread(rtsSpark spark) 
1859 { StgTSO *tso;
1860   ASSERT(spark != (rtsSpark)NULL);
1861   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1862   { threadsIgnored++;
1863     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1864           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1865     return END_TSO_QUEUE;
1866   }
1867   else
1868   { threadsCreated++;
1869     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1870     if (tso==END_TSO_QUEUE)     
1871       barf("createSparkThread: Cannot create TSO");
1872 #if defined(DIST)
1873     tso->priority = AdvisoryPriority;
1874 #endif
1875     pushClosure(tso,spark);
1876     PUSH_ON_RUN_QUEUE(tso);
1877     advisory_thread_count++;    
1878   }
1879   return tso;
1880 }
1881 #endif
1882
1883 /*
1884   Turn a spark into a thread.
1885   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1886 */
1887 #if defined(PAR)
1888 //@cindex activateSpark
1889 StgTSO *
1890 activateSpark (rtsSpark spark) 
1891 {
1892   StgTSO *tso;
1893
1894   tso = createSparkThread(spark);
1895   if (RtsFlags.ParFlags.ParStats.Full) {   
1896     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1897     IF_PAR_DEBUG(verbose,
1898                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1899                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1900   }
1901   // ToDo: fwd info on local/global spark to thread -- HWL
1902   // tso->gran.exported =  spark->exported;
1903   // tso->gran.locked =   !spark->global;
1904   // tso->gran.sparkname = spark->name;
1905
1906   return tso;
1907 }
1908 #endif
1909
1910 /* ---------------------------------------------------------------------------
1911  * scheduleThread()
1912  *
1913  * scheduleThread puts a thread on the head of the runnable queue.
1914  * This will usually be done immediately after a thread is created.
1915  * The caller of scheduleThread must create the thread using e.g.
1916  * createThread and push an appropriate closure
1917  * on this thread's stack before the scheduler is invoked.
1918  * ------------------------------------------------------------------------ */
1919
1920 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1921
1922 void
1923 scheduleThread_(StgTSO *tso
1924                , rtsBool createTask
1925 #if !defined(THREADED_RTS)
1926                  STG_UNUSED
1927 #endif
1928               )
1929 {
1930   ACQUIRE_LOCK(&sched_mutex);
1931
1932   /* Put the new thread on the head of the runnable queue.  The caller
1933    * better push an appropriate closure on this thread's stack
1934    * beforehand.  In the SMP case, the thread may start running as
1935    * soon as we release the scheduler lock below.
1936    */
1937   PUSH_ON_RUN_QUEUE(tso);
1938 #if defined(THREADED_RTS)
1939   /* If main() is scheduling a thread, don't bother creating a 
1940    * new task.
1941    */
1942   if ( createTask ) {
1943     startTask(taskStart);
1944   }
1945 #endif
1946   THREAD_RUNNABLE();
1947
1948 #if 0
1949   IF_DEBUG(scheduler,printTSO(tso));
1950 #endif
1951   RELEASE_LOCK(&sched_mutex);
1952 }
1953
1954 void scheduleThread(StgTSO* tso)
1955 {
1956   return scheduleThread_(tso, rtsFalse);
1957 }
1958
1959 void scheduleExtThread(StgTSO* tso)
1960 {
1961   return scheduleThread_(tso, rtsTrue);
1962 }
1963
1964 /* ---------------------------------------------------------------------------
1965  * initScheduler()
1966  *
1967  * Initialise the scheduler.  This resets all the queues - if the
1968  * queues contained any threads, they'll be garbage collected at the
1969  * next pass.
1970  *
1971  * ------------------------------------------------------------------------ */
1972
1973 #ifdef SMP
1974 static void
1975 term_handler(int sig STG_UNUSED)
1976 {
1977   stat_workerStop();
1978   ACQUIRE_LOCK(&term_mutex);
1979   await_death--;
1980   RELEASE_LOCK(&term_mutex);
1981   shutdownThread();
1982 }
1983 #endif
1984
1985 void 
1986 initScheduler(void)
1987 {
1988 #if defined(GRAN)
1989   nat i;
1990
1991   for (i=0; i<=MAX_PROC; i++) {
1992     run_queue_hds[i]      = END_TSO_QUEUE;
1993     run_queue_tls[i]      = END_TSO_QUEUE;
1994     blocked_queue_hds[i]  = END_TSO_QUEUE;
1995     blocked_queue_tls[i]  = END_TSO_QUEUE;
1996     ccalling_threadss[i]  = END_TSO_QUEUE;
1997     sleeping_queue        = END_TSO_QUEUE;
1998   }
1999 #else
2000   run_queue_hd      = END_TSO_QUEUE;
2001   run_queue_tl      = END_TSO_QUEUE;
2002   blocked_queue_hd  = END_TSO_QUEUE;
2003   blocked_queue_tl  = END_TSO_QUEUE;
2004   sleeping_queue    = END_TSO_QUEUE;
2005 #endif 
2006
2007   suspended_ccalling_threads  = END_TSO_QUEUE;
2008
2009   main_threads = NULL;
2010   all_threads  = END_TSO_QUEUE;
2011
2012   context_switch = 0;
2013   interrupted    = 0;
2014
2015   RtsFlags.ConcFlags.ctxtSwitchTicks =
2016       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2017       
2018 #if defined(RTS_SUPPORTS_THREADS)
2019   /* Initialise the mutex and condition variables used by
2020    * the scheduler. */
2021   initMutex(&sched_mutex);
2022   initMutex(&term_mutex);
2023
2024   initCondition(&thread_ready_cond);
2025 #endif
2026   
2027 #if defined(SMP)
2028   initCondition(&gc_pending_cond);
2029 #endif
2030
2031 #if defined(RTS_SUPPORTS_THREADS)
2032   ACQUIRE_LOCK(&sched_mutex);
2033 #endif
2034
2035   /* Install the SIGHUP handler */
2036 #if defined(SMP)
2037   {
2038     struct sigaction action,oact;
2039
2040     action.sa_handler = term_handler;
2041     sigemptyset(&action.sa_mask);
2042     action.sa_flags = 0;
2043     if (sigaction(SIGTERM, &action, &oact) != 0) {
2044       barf("can't install TERM handler");
2045     }
2046   }
2047 #endif
2048
2049   /* A capability holds the state a native thread needs in
2050    * order to execute STG code. At least one capability is
2051    * floating around (only SMP builds have more than one).
2052    */
2053   initCapabilities();
2054   
2055 #if defined(RTS_SUPPORTS_THREADS)
2056     /* start our haskell execution tasks */
2057 # if defined(SMP)
2058     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2059 # else
2060     startTaskManager(0,taskStart);
2061 # endif
2062 #endif
2063
2064 #if /* defined(SMP) ||*/ defined(PAR)
2065   initSparkPools();
2066 #endif
2067
2068 #if defined(RTS_SUPPORTS_THREADS)
2069   RELEASE_LOCK(&sched_mutex);
2070 #endif
2071
2072 }
2073
2074 void
2075 exitScheduler( void )
2076 {
2077 #if defined(RTS_SUPPORTS_THREADS)
2078   stopTaskManager();
2079 #endif
2080 }
2081
2082 /* -----------------------------------------------------------------------------
2083    Managing the per-task allocation areas.
2084    
2085    Each capability comes with an allocation area.  These are
2086    fixed-length block lists into which allocation can be done.
2087
2088    ToDo: no support for two-space collection at the moment???
2089    -------------------------------------------------------------------------- */
2090
2091 /* -----------------------------------------------------------------------------
2092  * waitThread is the external interface for running a new computation
2093  * and waiting for the result.
2094  *
2095  * In the non-SMP case, we create a new main thread, push it on the 
2096  * main-thread stack, and invoke the scheduler to run it.  The
2097  * scheduler will return when the top main thread on the stack has
2098  * completed or died, and fill in the necessary fields of the
2099  * main_thread structure.
2100  *
2101  * In the SMP case, we create a main thread as before, but we then
2102  * create a new condition variable and sleep on it.  When our new
2103  * main thread has completed, we'll be woken up and the status/result
2104  * will be in the main_thread struct.
2105  * -------------------------------------------------------------------------- */
2106
2107 int 
2108 howManyThreadsAvail ( void )
2109 {
2110    int i = 0;
2111    StgTSO* q;
2112    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2113       i++;
2114    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2115       i++;
2116    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2117       i++;
2118    return i;
2119 }
2120
2121 void
2122 finishAllThreads ( void )
2123 {
2124    do {
2125       while (run_queue_hd != END_TSO_QUEUE) {
2126          waitThread ( run_queue_hd, NULL);
2127       }
2128       while (blocked_queue_hd != END_TSO_QUEUE) {
2129          waitThread ( blocked_queue_hd, NULL);
2130       }
2131       while (sleeping_queue != END_TSO_QUEUE) {
2132          waitThread ( blocked_queue_hd, NULL);
2133       }
2134    } while 
2135       (blocked_queue_hd != END_TSO_QUEUE || 
2136        run_queue_hd     != END_TSO_QUEUE ||
2137        sleeping_queue   != END_TSO_QUEUE);
2138 }
2139
2140 SchedulerStatus
2141 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2142
2143   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2144 #if defined(THREADED_RTS)
2145   return waitThread_(tso,ret, rtsFalse);
2146 #else
2147   return waitThread_(tso,ret);
2148 #endif
2149 }
2150
2151 SchedulerStatus
2152 waitThread_(StgTSO *tso,
2153             /*out*/StgClosure **ret
2154 #if defined(THREADED_RTS)
2155             , rtsBool blockWaiting
2156 #endif
2157            )
2158 {
2159   StgMainThread *m;
2160   SchedulerStatus stat;
2161
2162   ACQUIRE_LOCK(&sched_mutex);
2163   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2164   
2165   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2166
2167   m->tso = tso;
2168   m->ret = ret;
2169   m->stat = NoStatus;
2170 #if defined(RTS_SUPPORTS_THREADS)
2171   initCondition(&m->wakeup);
2172 #endif
2173
2174   m->link = main_threads;
2175   main_threads = m;
2176
2177   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2178
2179 #if defined(RTS_SUPPORTS_THREADS)
2180
2181 # if defined(THREADED_RTS)
2182   if (!blockWaiting) {
2183     /* In the threaded case, the OS thread that called main()
2184      * gets to enter the RTS directly without going via another
2185      * task/thread.
2186      */
2187     RELEASE_LOCK(&sched_mutex);
2188     schedule();
2189     ASSERT(m->stat != NoStatus);
2190   } else 
2191 # endif
2192   {
2193     IF_DEBUG(scheduler, sched_belch("sfoo"));
2194     do {
2195       waitCondition(&m->wakeup, &sched_mutex);
2196     } while (m->stat == NoStatus);
2197   }
2198 #elif defined(GRAN)
2199   /* GranSim specific init */
2200   CurrentTSO = m->tso;                // the TSO to run
2201   procStatus[MainProc] = Busy;        // status of main PE
2202   CurrentProc = MainProc;             // PE to run it on
2203
2204   schedule();
2205 #else
2206   RELEASE_LOCK(&sched_mutex);
2207   schedule();
2208   ASSERT(m->stat != NoStatus);
2209 #endif
2210
2211   stat = m->stat;
2212
2213 #if defined(RTS_SUPPORTS_THREADS)
2214   closeCondition(&m->wakeup);
2215 #endif
2216
2217   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2218                               m->tso->id));
2219   free(m);
2220
2221 #if defined(THREADED_RTS)
2222   if (blockWaiting) 
2223 #endif
2224     RELEASE_LOCK(&sched_mutex);
2225
2226   return stat;
2227 }
2228
2229 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2230 //@subsection Run queue code 
2231
2232 #if 0
2233 /* 
2234    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2235        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2236        implicit global variable that has to be correct when calling these
2237        fcts -- HWL 
2238 */
2239
2240 /* Put the new thread on the head of the runnable queue.
2241  * The caller of createThread better push an appropriate closure
2242  * on this thread's stack before the scheduler is invoked.
2243  */
2244 static /* inline */ void
2245 add_to_run_queue(tso)
2246 StgTSO* tso; 
2247 {
2248   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2249   tso->link = run_queue_hd;
2250   run_queue_hd = tso;
2251   if (run_queue_tl == END_TSO_QUEUE) {
2252     run_queue_tl = tso;
2253   }
2254 }
2255
2256 /* Put the new thread at the end of the runnable queue. */
2257 static /* inline */ void
2258 push_on_run_queue(tso)
2259 StgTSO* tso; 
2260 {
2261   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2262   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2263   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2264   if (run_queue_hd == END_TSO_QUEUE) {
2265     run_queue_hd = tso;
2266   } else {
2267     run_queue_tl->link = tso;
2268   }
2269   run_queue_tl = tso;
2270 }
2271
2272 /* 
2273    Should be inlined because it's used very often in schedule.  The tso
2274    argument is actually only needed in GranSim, where we want to have the
2275    possibility to schedule *any* TSO on the run queue, irrespective of the
2276    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2277    the run queue and dequeue the tso, adjusting the links in the queue. 
2278 */
2279 //@cindex take_off_run_queue
2280 static /* inline */ StgTSO*
2281 take_off_run_queue(StgTSO *tso) {
2282   StgTSO *t, *prev;
2283
2284   /* 
2285      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2286
2287      if tso is specified, unlink that tso from the run_queue (doesn't have
2288      to be at the beginning of the queue); GranSim only 
2289   */
2290   if (tso!=END_TSO_QUEUE) {
2291     /* find tso in queue */
2292     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2293          t!=END_TSO_QUEUE && t!=tso;
2294          prev=t, t=t->link) 
2295       /* nothing */ ;
2296     ASSERT(t==tso);
2297     /* now actually dequeue the tso */
2298     if (prev!=END_TSO_QUEUE) {
2299       ASSERT(run_queue_hd!=t);
2300       prev->link = t->link;
2301     } else {
2302       /* t is at beginning of thread queue */
2303       ASSERT(run_queue_hd==t);
2304       run_queue_hd = t->link;
2305     }
2306     /* t is at end of thread queue */
2307     if (t->link==END_TSO_QUEUE) {
2308       ASSERT(t==run_queue_tl);
2309       run_queue_tl = prev;
2310     } else {
2311       ASSERT(run_queue_tl!=t);
2312     }
2313     t->link = END_TSO_QUEUE;
2314   } else {
2315     /* take tso from the beginning of the queue; std concurrent code */
2316     t = run_queue_hd;
2317     if (t != END_TSO_QUEUE) {
2318       run_queue_hd = t->link;
2319       t->link = END_TSO_QUEUE;
2320       if (run_queue_hd == END_TSO_QUEUE) {
2321         run_queue_tl = END_TSO_QUEUE;
2322       }
2323     }
2324   }
2325   return t;
2326 }
2327
2328 #endif /* 0 */
2329
2330 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2331 //@subsection Garbage Collextion Routines
2332
2333 /* ---------------------------------------------------------------------------
2334    Where are the roots that we know about?
2335
2336         - all the threads on the runnable queue
2337         - all the threads on the blocked queue
2338         - all the threads on the sleeping queue
2339         - all the thread currently executing a _ccall_GC
2340         - all the "main threads"
2341      
2342    ------------------------------------------------------------------------ */
2343
2344 /* This has to be protected either by the scheduler monitor, or by the
2345         garbage collection monitor (probably the latter).
2346         KH @ 25/10/99
2347 */
2348
2349 void
2350 GetRoots(evac_fn evac)
2351 {
2352 #if defined(GRAN)
2353   {
2354     nat i;
2355     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2356       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2357           evac((StgClosure **)&run_queue_hds[i]);
2358       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2359           evac((StgClosure **)&run_queue_tls[i]);
2360       
2361       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2362           evac((StgClosure **)&blocked_queue_hds[i]);
2363       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2364           evac((StgClosure **)&blocked_queue_tls[i]);
2365       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2366           evac((StgClosure **)&ccalling_threads[i]);
2367     }
2368   }
2369
2370   markEventQueue();
2371
2372 #else /* !GRAN */
2373   if (run_queue_hd != END_TSO_QUEUE) {
2374       ASSERT(run_queue_tl != END_TSO_QUEUE);
2375       evac((StgClosure **)&run_queue_hd);
2376       evac((StgClosure **)&run_queue_tl);
2377   }
2378   
2379   if (blocked_queue_hd != END_TSO_QUEUE) {
2380       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2381       evac((StgClosure **)&blocked_queue_hd);
2382       evac((StgClosure **)&blocked_queue_tl);
2383   }
2384   
2385   if (sleeping_queue != END_TSO_QUEUE) {
2386       evac((StgClosure **)&sleeping_queue);
2387   }
2388 #endif 
2389
2390   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2391       evac((StgClosure **)&suspended_ccalling_threads);
2392   }
2393
2394 #if defined(PAR) || defined(GRAN)
2395   markSparkQueue(evac);
2396 #endif
2397 }
2398
2399 /* -----------------------------------------------------------------------------
2400    performGC
2401
2402    This is the interface to the garbage collector from Haskell land.
2403    We provide this so that external C code can allocate and garbage
2404    collect when called from Haskell via _ccall_GC.
2405
2406    It might be useful to provide an interface whereby the programmer
2407    can specify more roots (ToDo).
2408    
2409    This needs to be protected by the GC condition variable above.  KH.
2410    -------------------------------------------------------------------------- */
2411
2412 void (*extra_roots)(evac_fn);
2413
2414 void
2415 performGC(void)
2416 {
2417   /* Obligated to hold this lock upon entry */
2418   ACQUIRE_LOCK(&sched_mutex);
2419   GarbageCollect(GetRoots,rtsFalse);
2420   RELEASE_LOCK(&sched_mutex);
2421 }
2422
2423 void
2424 performMajorGC(void)
2425 {
2426   ACQUIRE_LOCK(&sched_mutex);
2427   GarbageCollect(GetRoots,rtsTrue);
2428   RELEASE_LOCK(&sched_mutex);
2429 }
2430
2431 static void
2432 AllRoots(evac_fn evac)
2433 {
2434     GetRoots(evac);             // the scheduler's roots
2435     extra_roots(evac);          // the user's roots
2436 }
2437
2438 void
2439 performGCWithRoots(void (*get_roots)(evac_fn))
2440 {
2441   ACQUIRE_LOCK(&sched_mutex);
2442   extra_roots = get_roots;
2443   GarbageCollect(AllRoots,rtsFalse);
2444   RELEASE_LOCK(&sched_mutex);
2445 }
2446
2447 /* -----------------------------------------------------------------------------
2448    Stack overflow
2449
2450    If the thread has reached its maximum stack size, then raise the
2451    StackOverflow exception in the offending thread.  Otherwise
2452    relocate the TSO into a larger chunk of memory and adjust its stack
2453    size appropriately.
2454    -------------------------------------------------------------------------- */
2455
2456 static StgTSO *
2457 threadStackOverflow(StgTSO *tso)
2458 {
2459   nat new_stack_size, new_tso_size, diff, stack_words;
2460   StgPtr new_sp;
2461   StgTSO *dest;
2462
2463   IF_DEBUG(sanity,checkTSO(tso));
2464   if (tso->stack_size >= tso->max_stack_size) {
2465
2466     IF_DEBUG(gc,
2467              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2468                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2469              /* If we're debugging, just print out the top of the stack */
2470              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2471                                               tso->sp+64)));
2472
2473     /* Send this thread the StackOverflow exception */
2474     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2475     return tso;
2476   }
2477
2478   /* Try to double the current stack size.  If that takes us over the
2479    * maximum stack size for this thread, then use the maximum instead.
2480    * Finally round up so the TSO ends up as a whole number of blocks.
2481    */
2482   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2483   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2484                                        TSO_STRUCT_SIZE)/sizeof(W_);
2485   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2486   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2487
2488   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2489
2490   dest = (StgTSO *)allocate(new_tso_size);
2491   TICK_ALLOC_TSO(new_stack_size,0);
2492
2493   /* copy the TSO block and the old stack into the new area */
2494   memcpy(dest,tso,TSO_STRUCT_SIZE);
2495   stack_words = tso->stack + tso->stack_size - tso->sp;
2496   new_sp = (P_)dest + new_tso_size - stack_words;
2497   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2498
2499   /* relocate the stack pointers... */
2500   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2501   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2502   dest->sp    = new_sp;
2503   dest->stack_size = new_stack_size;
2504         
2505   /* and relocate the update frame list */
2506   relocate_stack(dest, diff);
2507
2508   /* Mark the old TSO as relocated.  We have to check for relocated
2509    * TSOs in the garbage collector and any primops that deal with TSOs.
2510    *
2511    * It's important to set the sp and su values to just beyond the end
2512    * of the stack, so we don't attempt to scavenge any part of the
2513    * dead TSO's stack.
2514    */
2515   tso->what_next = ThreadRelocated;
2516   tso->link = dest;
2517   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2518   tso->su = (StgUpdateFrame *)tso->sp;
2519   tso->why_blocked = NotBlocked;
2520   dest->mut_link = NULL;
2521
2522   IF_PAR_DEBUG(verbose,
2523                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2524                      tso->id, tso, tso->stack_size);
2525                /* If we're debugging, just print out the top of the stack */
2526                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2527                                                 tso->sp+64)));
2528   
2529   IF_DEBUG(sanity,checkTSO(tso));
2530 #if 0
2531   IF_DEBUG(scheduler,printTSO(dest));
2532 #endif
2533
2534   return dest;
2535 }
2536
2537 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2538 //@subsection Blocking Queue Routines
2539
2540 /* ---------------------------------------------------------------------------
2541    Wake up a queue that was blocked on some resource.
2542    ------------------------------------------------------------------------ */
2543
2544 #if defined(GRAN)
2545 static inline void
2546 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2547 {
2548 }
2549 #elif defined(PAR)
2550 static inline void
2551 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2552 {
2553   /* write RESUME events to log file and
2554      update blocked and fetch time (depending on type of the orig closure) */
2555   if (RtsFlags.ParFlags.ParStats.Full) {
2556     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2557                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2558                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2559     if (EMPTY_RUN_QUEUE())
2560       emitSchedule = rtsTrue;
2561
2562     switch (get_itbl(node)->type) {
2563         case FETCH_ME_BQ:
2564           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2565           break;
2566         case RBH:
2567         case FETCH_ME:
2568         case BLACKHOLE_BQ:
2569           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2570           break;
2571 #ifdef DIST
2572         case MVAR:
2573           break;
2574 #endif    
2575         default:
2576           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2577         }
2578       }
2579 }
2580 #endif
2581
2582 #if defined(GRAN)
2583 static StgBlockingQueueElement *
2584 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2585 {
2586     StgTSO *tso;
2587     PEs node_loc, tso_loc;
2588
2589     node_loc = where_is(node); // should be lifted out of loop
2590     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2591     tso_loc = where_is((StgClosure *)tso);
2592     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2593       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2594       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2595       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2596       // insertThread(tso, node_loc);
2597       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2598                 ResumeThread,
2599                 tso, node, (rtsSpark*)NULL);
2600       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2601       // len_local++;
2602       // len++;
2603     } else { // TSO is remote (actually should be FMBQ)
2604       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2605                                   RtsFlags.GranFlags.Costs.gunblocktime +
2606                                   RtsFlags.GranFlags.Costs.latency;
2607       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2608                 UnblockThread,
2609                 tso, node, (rtsSpark*)NULL);
2610       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2611       // len++;
2612     }
2613     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2614     IF_GRAN_DEBUG(bq,
2615                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2616                           (node_loc==tso_loc ? "Local" : "Global"), 
2617                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2618     tso->block_info.closure = NULL;
2619     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2620                              tso->id, tso));
2621 }
2622 #elif defined(PAR)
2623 static StgBlockingQueueElement *
2624 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2625 {
2626     StgBlockingQueueElement *next;
2627
2628     switch (get_itbl(bqe)->type) {
2629     case TSO:
2630       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2631       /* if it's a TSO just push it onto the run_queue */
2632       next = bqe->link;
2633       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2634       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2635       THREAD_RUNNABLE();
2636       unblockCount(bqe, node);
2637       /* reset blocking status after dumping event */
2638       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2639       break;
2640
2641     case BLOCKED_FETCH:
2642       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2643       next = bqe->link;
2644       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2645       PendingFetches = (StgBlockedFetch *)bqe;
2646       break;
2647
2648 # if defined(DEBUG)
2649       /* can ignore this case in a non-debugging setup; 
2650          see comments on RBHSave closures above */
2651     case CONSTR:
2652       /* check that the closure is an RBHSave closure */
2653       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2654              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2655              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2656       break;
2657
2658     default:
2659       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2660            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2661            (StgClosure *)bqe);
2662 # endif
2663     }
2664   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2665   return next;
2666 }
2667
2668 #else /* !GRAN && !PAR */
2669 static StgTSO *
2670 unblockOneLocked(StgTSO *tso)
2671 {
2672   StgTSO *next;
2673
2674   ASSERT(get_itbl(tso)->type == TSO);
2675   ASSERT(tso->why_blocked != NotBlocked);
2676   tso->why_blocked = NotBlocked;
2677   next = tso->link;
2678   PUSH_ON_RUN_QUEUE(tso);
2679   THREAD_RUNNABLE();
2680   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2681   return next;
2682 }
2683 #endif
2684
2685 #if defined(GRAN) || defined(PAR)
2686 inline StgBlockingQueueElement *
2687 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2688 {
2689   ACQUIRE_LOCK(&sched_mutex);
2690   bqe = unblockOneLocked(bqe, node);
2691   RELEASE_LOCK(&sched_mutex);
2692   return bqe;
2693 }
2694 #else
2695 inline StgTSO *
2696 unblockOne(StgTSO *tso)
2697 {
2698   ACQUIRE_LOCK(&sched_mutex);
2699   tso = unblockOneLocked(tso);
2700   RELEASE_LOCK(&sched_mutex);
2701   return tso;
2702 }
2703 #endif
2704
2705 #if defined(GRAN)
2706 void 
2707 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2708 {
2709   StgBlockingQueueElement *bqe;
2710   PEs node_loc;
2711   nat len = 0; 
2712
2713   IF_GRAN_DEBUG(bq, 
2714                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2715                       node, CurrentProc, CurrentTime[CurrentProc], 
2716                       CurrentTSO->id, CurrentTSO));
2717
2718   node_loc = where_is(node);
2719
2720   ASSERT(q == END_BQ_QUEUE ||
2721          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2722          get_itbl(q)->type == CONSTR); // closure (type constructor)
2723   ASSERT(is_unique(node));
2724
2725   /* FAKE FETCH: magically copy the node to the tso's proc;
2726      no Fetch necessary because in reality the node should not have been 
2727      moved to the other PE in the first place
2728   */
2729   if (CurrentProc!=node_loc) {
2730     IF_GRAN_DEBUG(bq, 
2731                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2732                         node, node_loc, CurrentProc, CurrentTSO->id, 
2733                         // CurrentTSO, where_is(CurrentTSO),
2734                         node->header.gran.procs));
2735     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2736     IF_GRAN_DEBUG(bq, 
2737                   belch("## new bitmask of node %p is %#x",
2738                         node, node->header.gran.procs));
2739     if (RtsFlags.GranFlags.GranSimStats.Global) {
2740       globalGranStats.tot_fake_fetches++;
2741     }
2742   }
2743
2744   bqe = q;
2745   // ToDo: check: ASSERT(CurrentProc==node_loc);
2746   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2747     //next = bqe->link;
2748     /* 
2749        bqe points to the current element in the queue
2750        next points to the next element in the queue
2751     */
2752     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2753     //tso_loc = where_is(tso);
2754     len++;
2755     bqe = unblockOneLocked(bqe, node);
2756   }
2757
2758   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2759      the closure to make room for the anchor of the BQ */
2760   if (bqe!=END_BQ_QUEUE) {
2761     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2762     /*
2763     ASSERT((info_ptr==&RBH_Save_0_info) ||
2764            (info_ptr==&RBH_Save_1_info) ||
2765            (info_ptr==&RBH_Save_2_info));
2766     */
2767     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2768     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2769     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2770
2771     IF_GRAN_DEBUG(bq,
2772                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2773                         node, info_type(node)));
2774   }
2775
2776   /* statistics gathering */
2777   if (RtsFlags.GranFlags.GranSimStats.Global) {
2778     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2779     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2780     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2781     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2782   }
2783   IF_GRAN_DEBUG(bq,
2784                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2785                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2786 }
2787 #elif defined(PAR)
2788 void 
2789 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2790 {
2791   StgBlockingQueueElement *bqe;
2792
2793   ACQUIRE_LOCK(&sched_mutex);
2794
2795   IF_PAR_DEBUG(verbose, 
2796                belch("##-_ AwBQ for node %p on [%x]: ",
2797                      node, mytid));
2798 #ifdef DIST  
2799   //RFP
2800   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2801     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2802     return;
2803   }
2804 #endif
2805   
2806   ASSERT(q == END_BQ_QUEUE ||
2807          get_itbl(q)->type == TSO ||           
2808          get_itbl(q)->type == BLOCKED_FETCH || 
2809          get_itbl(q)->type == CONSTR); 
2810
2811   bqe = q;
2812   while (get_itbl(bqe)->type==TSO || 
2813          get_itbl(bqe)->type==BLOCKED_FETCH) {
2814     bqe = unblockOneLocked(bqe, node);
2815   }
2816   RELEASE_LOCK(&sched_mutex);
2817 }
2818
2819 #else   /* !GRAN && !PAR */
2820 void
2821 awakenBlockedQueue(StgTSO *tso)
2822 {
2823   ACQUIRE_LOCK(&sched_mutex);
2824   while (tso != END_TSO_QUEUE) {
2825     tso = unblockOneLocked(tso);
2826   }
2827   RELEASE_LOCK(&sched_mutex);
2828 }
2829 #endif
2830
2831 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2832 //@subsection Exception Handling Routines
2833
2834 /* ---------------------------------------------------------------------------
2835    Interrupt execution
2836    - usually called inside a signal handler so it mustn't do anything fancy.   
2837    ------------------------------------------------------------------------ */
2838
2839 void
2840 interruptStgRts(void)
2841 {
2842     interrupted    = 1;
2843     context_switch = 1;
2844 }
2845
2846 /* -----------------------------------------------------------------------------
2847    Unblock a thread
2848
2849    This is for use when we raise an exception in another thread, which
2850    may be blocked.
2851    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2852    -------------------------------------------------------------------------- */
2853
2854 #if defined(GRAN) || defined(PAR)
2855 /*
2856   NB: only the type of the blocking queue is different in GranSim and GUM
2857       the operations on the queue-elements are the same
2858       long live polymorphism!
2859
2860   Locks: sched_mutex is held upon entry and exit.
2861
2862 */
2863 static void
2864 unblockThread(StgTSO *tso)
2865 {
2866   StgBlockingQueueElement *t, **last;
2867
2868   switch (tso->why_blocked) {
2869
2870   case NotBlocked:
2871     return;  /* not blocked */
2872
2873   case BlockedOnMVar:
2874     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2875     {
2876       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2877       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2878
2879       last = (StgBlockingQueueElement **)&mvar->head;
2880       for (t = (StgBlockingQueueElement *)mvar->head; 
2881            t != END_BQ_QUEUE; 
2882            last = &t->link, last_tso = t, t = t->link) {
2883         if (t == (StgBlockingQueueElement *)tso) {
2884           *last = (StgBlockingQueueElement *)tso->link;
2885           if (mvar->tail == tso) {
2886             mvar->tail = (StgTSO *)last_tso;
2887           }
2888           goto done;
2889         }
2890       }
2891       barf("unblockThread (MVAR): TSO not found");
2892     }
2893
2894   case BlockedOnBlackHole:
2895     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2896     {
2897       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2898
2899       last = &bq->blocking_queue;
2900       for (t = bq->blocking_queue; 
2901            t != END_BQ_QUEUE; 
2902            last = &t->link, t = t->link) {
2903         if (t == (StgBlockingQueueElement *)tso) {
2904           *last = (StgBlockingQueueElement *)tso->link;
2905           goto done;
2906         }
2907       }
2908       barf("unblockThread (BLACKHOLE): TSO not found");
2909     }
2910
2911   case BlockedOnException:
2912     {
2913       StgTSO *target  = tso->block_info.tso;
2914
2915       ASSERT(get_itbl(target)->type == TSO);
2916
2917       if (target->what_next == ThreadRelocated) {
2918           target = target->link;
2919           ASSERT(get_itbl(target)->type == TSO);
2920       }
2921
2922       ASSERT(target->blocked_exceptions != NULL);
2923
2924       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2925       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2926            t != END_BQ_QUEUE; 
2927            last = &t->link, t = t->link) {
2928         ASSERT(get_itbl(t)->type == TSO);
2929         if (t == (StgBlockingQueueElement *)tso) {
2930           *last = (StgBlockingQueueElement *)tso->link;
2931           goto done;
2932         }
2933       }
2934       barf("unblockThread (Exception): TSO not found");
2935     }
2936
2937   case BlockedOnRead:
2938   case BlockedOnWrite:
2939     {
2940       /* take TSO off blocked_queue */
2941       StgBlockingQueueElement *prev = NULL;
2942       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2943            prev = t, t = t->link) {
2944         if (t == (StgBlockingQueueElement *)tso) {
2945           if (prev == NULL) {
2946             blocked_queue_hd = (StgTSO *)t->link;
2947             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2948               blocked_queue_tl = END_TSO_QUEUE;
2949             }
2950           } else {
2951             prev->link = t->link;
2952             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2953               blocked_queue_tl = (StgTSO *)prev;
2954             }
2955           }
2956           goto done;
2957         }
2958       }
2959       barf("unblockThread (I/O): TSO not found");
2960     }
2961
2962   case BlockedOnDelay:
2963     {
2964       /* take TSO off sleeping_queue */
2965       StgBlockingQueueElement *prev = NULL;
2966       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2967            prev = t, t = t->link) {
2968         if (t == (StgBlockingQueueElement *)tso) {
2969           if (prev == NULL) {
2970             sleeping_queue = (StgTSO *)t->link;
2971           } else {
2972             prev->link = t->link;
2973           }
2974           goto done;
2975         }
2976       }
2977       barf("unblockThread (I/O): TSO not found");
2978     }
2979
2980   default:
2981     barf("unblockThread");
2982   }
2983
2984  done:
2985   tso->link = END_TSO_QUEUE;
2986   tso->why_blocked = NotBlocked;
2987   tso->block_info.closure = NULL;
2988   PUSH_ON_RUN_QUEUE(tso);
2989 }
2990 #else
2991 static void
2992 unblockThread(StgTSO *tso)
2993 {
2994   StgTSO *t, **last;
2995   
2996   /* To avoid locking unnecessarily. */
2997   if (tso->why_blocked == NotBlocked) {
2998     return;
2999   }
3000
3001   switch (tso->why_blocked) {
3002
3003   case BlockedOnMVar:
3004     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3005     {
3006       StgTSO *last_tso = END_TSO_QUEUE;
3007       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3008
3009       last = &mvar->head;
3010       for (t = mvar->head; t != END_TSO_QUEUE; 
3011            last = &t->link, last_tso = t, t = t->link) {
3012         if (t == tso) {
3013           *last = tso->link;
3014           if (mvar->tail == tso) {
3015             mvar->tail = last_tso;
3016           }
3017           goto done;
3018         }
3019       }
3020       barf("unblockThread (MVAR): TSO not found");
3021     }
3022
3023   case BlockedOnBlackHole:
3024     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3025     {
3026       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3027
3028       last = &bq->blocking_queue;
3029       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3030            last = &t->link, t = t->link) {
3031         if (t == tso) {
3032           *last = tso->link;
3033           goto done;
3034         }
3035       }
3036       barf("unblockThread (BLACKHOLE): TSO not found");
3037     }
3038
3039   case BlockedOnException:
3040     {
3041       StgTSO *target  = tso->block_info.tso;
3042
3043       ASSERT(get_itbl(target)->type == TSO);
3044
3045       while (target->what_next == ThreadRelocated) {
3046           target = target->link;
3047           ASSERT(get_itbl(target)->type == TSO);
3048       }
3049       
3050       ASSERT(target->blocked_exceptions != NULL);
3051
3052       last = &target->blocked_exceptions;
3053       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3054            last = &t->link, t = t->link) {
3055         ASSERT(get_itbl(t)->type == TSO);
3056         if (t == tso) {
3057           *last = tso->link;
3058           goto done;
3059         }
3060       }
3061       barf("unblockThread (Exception): TSO not found");
3062     }
3063
3064   case BlockedOnRead:
3065   case BlockedOnWrite:
3066     {
3067       StgTSO *prev = NULL;
3068       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3069            prev = t, t = t->link) {
3070         if (t == tso) {
3071           if (prev == NULL) {
3072             blocked_queue_hd = t->link;
3073             if (blocked_queue_tl == t) {
3074               blocked_queue_tl = END_TSO_QUEUE;
3075             }
3076           } else {
3077             prev->link = t->link;
3078             if (blocked_queue_tl == t) {
3079               blocked_queue_tl = prev;
3080             }
3081           }
3082           goto done;
3083         }
3084       }
3085       barf("unblockThread (I/O): TSO not found");
3086     }
3087
3088   case BlockedOnDelay:
3089     {
3090       StgTSO *prev = NULL;
3091       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3092            prev = t, t = t->link) {
3093         if (t == tso) {
3094           if (prev == NULL) {
3095             sleeping_queue = t->link;
3096           } else {
3097             prev->link = t->link;
3098           }
3099           goto done;
3100         }
3101       }
3102       barf("unblockThread (I/O): TSO not found");
3103     }
3104
3105   default:
3106     barf("unblockThread");
3107   }
3108
3109  done:
3110   tso->link = END_TSO_QUEUE;
3111   tso->why_blocked = NotBlocked;
3112   tso->block_info.closure = NULL;
3113   PUSH_ON_RUN_QUEUE(tso);
3114 }
3115 #endif
3116
3117 /* -----------------------------------------------------------------------------
3118  * raiseAsync()
3119  *
3120  * The following function implements the magic for raising an
3121  * asynchronous exception in an existing thread.
3122  *
3123  * We first remove the thread from any queue on which it might be
3124  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3125  *
3126  * We strip the stack down to the innermost CATCH_FRAME, building
3127  * thunks in the heap for all the active computations, so they can 
3128  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3129  * an application of the handler to the exception, and push it on
3130  * the top of the stack.
3131  * 
3132  * How exactly do we save all the active computations?  We create an
3133  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3134  * AP_UPDs pushes everything from the corresponding update frame
3135  * upwards onto the stack.  (Actually, it pushes everything up to the
3136  * next update frame plus a pointer to the next AP_UPD object.
3137  * Entering the next AP_UPD object pushes more onto the stack until we
3138  * reach the last AP_UPD object - at which point the stack should look
3139  * exactly as it did when we killed the TSO and we can continue
3140  * execution by entering the closure on top of the stack.
3141  *
3142  * We can also kill a thread entirely - this happens if either (a) the 
3143  * exception passed to raiseAsync is NULL, or (b) there's no
3144  * CATCH_FRAME on the stack.  In either case, we strip the entire
3145  * stack and replace the thread with a zombie.
3146  *
3147  * Locks: sched_mutex held upon entry nor exit.
3148  *
3149  * -------------------------------------------------------------------------- */
3150  
3151 void 
3152 deleteThread(StgTSO *tso)
3153 {
3154   raiseAsync(tso,NULL);
3155 }
3156
3157 void
3158 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3159 {
3160   /* When raising async exs from contexts where sched_mutex isn't held;
3161      use raiseAsyncWithLock(). */
3162   ACQUIRE_LOCK(&sched_mutex);
3163   raiseAsync(tso,exception);
3164   RELEASE_LOCK(&sched_mutex);
3165 }
3166
3167 void
3168 raiseAsync(StgTSO *tso, StgClosure *exception)
3169 {
3170   StgUpdateFrame* su = tso->su;
3171   StgPtr          sp = tso->sp;
3172   
3173   /* Thread already dead? */
3174   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3175     return;
3176   }
3177
3178   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3179
3180   /* Remove it from any blocking queues */
3181   unblockThread(tso);
3182
3183   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3184   /* The stack freezing code assumes there's a closure pointer on
3185    * the top of the stack.  This isn't always the case with compiled
3186    * code, so we have to push a dummy closure on the top which just
3187    * returns to the next return address on the stack.
3188    */
3189   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3190     *(--sp) = (W_)&stg_dummy_ret_closure;
3191   }
3192
3193   while (1) {
3194     nat words = ((P_)su - (P_)sp) - 1;
3195     nat i;
3196     StgAP_UPD * ap;
3197
3198     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3199      * then build the THUNK raise(exception), and leave it on
3200      * top of the CATCH_FRAME ready to enter.
3201      */
3202     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3203 #ifdef PROFILING
3204       StgCatchFrame *cf = (StgCatchFrame *)su;
3205 #endif
3206       StgClosure *raise;
3207
3208       /* we've got an exception to raise, so let's pass it to the
3209        * handler in this frame.
3210        */
3211       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3212       TICK_ALLOC_SE_THK(1,0);
3213       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3214       raise->payload[0] = exception;
3215
3216       /* throw away the stack from Sp up to the CATCH_FRAME.
3217        */
3218       sp = (P_)su - 1;
3219
3220       /* Ensure that async excpetions are blocked now, so we don't get
3221        * a surprise exception before we get around to executing the
3222        * handler.
3223        */
3224       if (tso->blocked_exceptions == NULL) {
3225           tso->blocked_exceptions = END_TSO_QUEUE;
3226       }
3227
3228       /* Put the newly-built THUNK on top of the stack, ready to execute
3229        * when the thread restarts.
3230        */
3231       sp[0] = (W_)raise;
3232       tso->sp = sp;
3233       tso->su = su;
3234       tso->what_next = ThreadEnterGHC;
3235       IF_DEBUG(sanity, checkTSO(tso));
3236       return;
3237     }
3238
3239     /* First build an AP_UPD consisting of the stack chunk above the
3240      * current update frame, with the top word on the stack as the
3241      * fun field.
3242      */
3243     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3244     
3245     ASSERT(words >= 0);
3246     
3247     ap->n_args = words;
3248     ap->fun    = (StgClosure *)sp[0];
3249     sp++;
3250     for(i=0; i < (nat)words; ++i) {
3251       ap->payload[i] = (StgClosure *)*sp++;
3252     }
3253     
3254     switch (get_itbl(su)->type) {
3255       
3256     case UPDATE_FRAME:
3257       {
3258         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3259         TICK_ALLOC_UP_THK(words+1,0);
3260         
3261         IF_DEBUG(scheduler,
3262                  fprintf(stderr,  "scheduler: Updating ");
3263                  printPtr((P_)su->updatee); 
3264                  fprintf(stderr,  " with ");
3265                  printObj((StgClosure *)ap);
3266                  );
3267         
3268         /* Replace the updatee with an indirection - happily
3269          * this will also wake up any threads currently
3270          * waiting on the result.
3271          *
3272          * Warning: if we're in a loop, more than one update frame on
3273          * the stack may point to the same object.  Be careful not to
3274          * overwrite an IND_OLDGEN in this case, because we'll screw
3275          * up the mutable lists.  To be on the safe side, don't
3276          * overwrite any kind of indirection at all.  See also
3277          * threadSqueezeStack in GC.c, where we have to make a similar
3278          * check.
3279          */
3280         if (!closure_IND(su->updatee)) {
3281             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3282         }
3283         su = su->link;
3284         sp += sizeofW(StgUpdateFrame) -1;
3285         sp[0] = (W_)ap; /* push onto stack */
3286         break;
3287       }
3288
3289     case CATCH_FRAME:
3290       {
3291         StgCatchFrame *cf = (StgCatchFrame *)su;
3292         StgClosure* o;
3293         
3294         /* We want a PAP, not an AP_UPD.  Fortunately, the
3295          * layout's the same.
3296          */
3297         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3298         TICK_ALLOC_UPD_PAP(words+1,0);
3299         
3300         /* now build o = FUN(catch,ap,handler) */
3301         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3302         TICK_ALLOC_FUN(2,0);
3303         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3304         o->payload[0] = (StgClosure *)ap;
3305         o->payload[1] = cf->handler;
3306         
3307         IF_DEBUG(scheduler,
3308                  fprintf(stderr,  "scheduler: Built ");
3309                  printObj((StgClosure *)o);
3310                  );
3311         
3312         /* pop the old handler and put o on the stack */
3313         su = cf->link;
3314         sp += sizeofW(StgCatchFrame) - 1;
3315         sp[0] = (W_)o;
3316         break;
3317       }
3318       
3319     case SEQ_FRAME:
3320       {
3321         StgSeqFrame *sf = (StgSeqFrame *)su;
3322         StgClosure* o;
3323         
3324         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3325         TICK_ALLOC_UPD_PAP(words+1,0);
3326         
3327         /* now build o = FUN(seq,ap) */
3328         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3329         TICK_ALLOC_SE_THK(1,0);
3330         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3331         o->payload[0] = (StgClosure *)ap;
3332         
3333         IF_DEBUG(scheduler,
3334                  fprintf(stderr,  "scheduler: Built ");
3335                  printObj((StgClosure *)o);
3336                  );
3337         
3338         /* pop the old handler and put o on the stack */
3339         su = sf->link;
3340         sp += sizeofW(StgSeqFrame) - 1;
3341         sp[0] = (W_)o;
3342         break;
3343       }
3344       
3345     case STOP_FRAME:
3346       /* We've stripped the entire stack, the thread is now dead. */
3347       sp += sizeofW(StgStopFrame) - 1;
3348       sp[0] = (W_)exception;    /* save the exception */
3349       tso->what_next = ThreadKilled;
3350       tso->su = (StgUpdateFrame *)(sp+1);
3351       tso->sp = sp;
3352       return;
3353
3354     default:
3355       barf("raiseAsync");
3356     }
3357   }
3358   barf("raiseAsync");
3359 }
3360
3361 /* -----------------------------------------------------------------------------
3362    resurrectThreads is called after garbage collection on the list of
3363    threads found to be garbage.  Each of these threads will be woken
3364    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3365    on an MVar, or NonTermination if the thread was blocked on a Black
3366    Hole.
3367
3368    Locks: sched_mutex isn't held upon entry nor exit.
3369    -------------------------------------------------------------------------- */
3370
3371 void
3372 resurrectThreads( StgTSO *threads )
3373 {
3374   StgTSO *tso, *next;
3375
3376   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3377     next = tso->global_link;
3378     tso->global_link = all_threads;
3379     all_threads = tso;
3380     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3381
3382     switch (tso->why_blocked) {
3383     case BlockedOnMVar:
3384     case BlockedOnException:
3385       /* Called by GC - sched_mutex lock is currently held. */
3386       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3387       break;
3388     case BlockedOnBlackHole:
3389       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3390       break;
3391     case NotBlocked:
3392       /* This might happen if the thread was blocked on a black hole
3393        * belonging to a thread that we've just woken up (raiseAsync
3394        * can wake up threads, remember...).
3395        */
3396       continue;
3397     default:
3398       barf("resurrectThreads: thread blocked in a strange way");
3399     }
3400   }
3401 }
3402
3403 /* -----------------------------------------------------------------------------
3404  * Blackhole detection: if we reach a deadlock, test whether any
3405  * threads are blocked on themselves.  Any threads which are found to
3406  * be self-blocked get sent a NonTermination exception.
3407  *
3408  * This is only done in a deadlock situation in order to avoid
3409  * performance overhead in the normal case.
3410  *
3411  * Locks: sched_mutex is held upon entry and exit.
3412  * -------------------------------------------------------------------------- */
3413
3414 static void
3415 detectBlackHoles( void )
3416 {
3417     StgTSO *t = all_threads;
3418     StgUpdateFrame *frame;
3419     StgClosure *blocked_on;
3420
3421     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3422
3423         while (t->what_next == ThreadRelocated) {
3424             t = t->link;
3425             ASSERT(get_itbl(t)->type == TSO);
3426         }
3427       
3428         if (t->why_blocked != BlockedOnBlackHole) {
3429             continue;
3430         }
3431
3432         blocked_on = t->block_info.closure;
3433
3434         for (frame = t->su; ; frame = frame->link) {
3435             switch (get_itbl(frame)->type) {
3436
3437             case UPDATE_FRAME:
3438                 if (frame->updatee == blocked_on) {
3439                     /* We are blocking on one of our own computations, so
3440                      * send this thread the NonTermination exception.  
3441                      */
3442                     IF_DEBUG(scheduler, 
3443                              sched_belch("thread %d is blocked on itself", t->id));
3444                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3445                     goto done;
3446                 }
3447                 else {
3448                     continue;
3449                 }
3450
3451             case CATCH_FRAME:
3452             case SEQ_FRAME:
3453                 continue;
3454                 
3455             case STOP_FRAME:
3456                 break;
3457             }
3458             break;
3459         }
3460
3461     done: ;
3462     }   
3463 }
3464
3465 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3466 //@subsection Debugging Routines
3467
3468 /* -----------------------------------------------------------------------------
3469    Debugging: why is a thread blocked
3470    -------------------------------------------------------------------------- */
3471
3472 #ifdef DEBUG
3473
3474 void
3475 printThreadBlockage(StgTSO *tso)
3476 {
3477   switch (tso->why_blocked) {
3478   case BlockedOnRead:
3479     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3480     break;
3481   case BlockedOnWrite:
3482     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3483     break;
3484   case BlockedOnDelay:
3485     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3486     break;
3487   case BlockedOnMVar:
3488     fprintf(stderr,"is blocked on an MVar");
3489     break;
3490   case BlockedOnException:
3491     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3492             tso->block_info.tso->id);
3493     break;
3494   case BlockedOnBlackHole:
3495     fprintf(stderr,"is blocked on a black hole");
3496     break;
3497   case NotBlocked:
3498     fprintf(stderr,"is not blocked");
3499     break;
3500 #if defined(PAR)
3501   case BlockedOnGA:
3502     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3503             tso->block_info.closure, info_type(tso->block_info.closure));
3504     break;
3505   case BlockedOnGA_NoSend:
3506     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3507             tso->block_info.closure, info_type(tso->block_info.closure));
3508     break;
3509 #endif
3510 #if defined(RTS_SUPPORTS_THREADS)
3511   case BlockedOnCCall:
3512     fprintf(stderr,"is blocked on an external call");
3513     break;
3514 #endif
3515   default:
3516     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3517          tso->why_blocked, tso->id, tso);
3518   }
3519 }
3520
3521 void
3522 printThreadStatus(StgTSO *tso)
3523 {
3524   switch (tso->what_next) {
3525   case ThreadKilled:
3526     fprintf(stderr,"has been killed");
3527     break;
3528   case ThreadComplete:
3529     fprintf(stderr,"has completed");
3530     break;
3531   default:
3532     printThreadBlockage(tso);
3533   }
3534 }
3535
3536 void
3537 printAllThreads(void)
3538 {
3539   StgTSO *t;
3540
3541 # if defined(GRAN)
3542   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3543   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3544                        time_string, rtsFalse/*no commas!*/);
3545
3546   sched_belch("all threads at [%s]:", time_string);
3547 # elif defined(PAR)
3548   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3549   ullong_format_string(CURRENT_TIME,
3550                        time_string, rtsFalse/*no commas!*/);
3551
3552   sched_belch("all threads at [%s]:", time_string);
3553 # else
3554   sched_belch("all threads:");
3555 # endif
3556
3557   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3558     fprintf(stderr, "\tthread %d ", t->id);
3559     if (t->label) fprintf(stderr,"[\"%s\"] ",t->label);
3560     printThreadStatus(t);
3561     fprintf(stderr,"\n");
3562   }
3563 }
3564     
3565 /* 
3566    Print a whole blocking queue attached to node (debugging only).
3567 */
3568 //@cindex print_bq
3569 # if defined(PAR)
3570 void 
3571 print_bq (StgClosure *node)
3572 {
3573   StgBlockingQueueElement *bqe;
3574   StgTSO *tso;
3575   rtsBool end;
3576
3577   fprintf(stderr,"## BQ of closure %p (%s): ",
3578           node, info_type(node));
3579
3580   /* should cover all closures that may have a blocking queue */
3581   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3582          get_itbl(node)->type == FETCH_ME_BQ ||
3583          get_itbl(node)->type == RBH ||
3584          get_itbl(node)->type == MVAR);
3585     
3586   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3587
3588   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3589 }
3590
3591 /* 
3592    Print a whole blocking queue starting with the element bqe.
3593 */
3594 void 
3595 print_bqe (StgBlockingQueueElement *bqe)
3596 {
3597   rtsBool end;
3598
3599   /* 
3600      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3601   */
3602   for (end = (bqe==END_BQ_QUEUE);
3603        !end; // iterate until bqe points to a CONSTR
3604        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3605        bqe = end ? END_BQ_QUEUE : bqe->link) {
3606     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3607     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3608     /* types of closures that may appear in a blocking queue */
3609     ASSERT(get_itbl(bqe)->type == TSO ||           
3610            get_itbl(bqe)->type == BLOCKED_FETCH || 
3611            get_itbl(bqe)->type == CONSTR); 
3612     /* only BQs of an RBH end with an RBH_Save closure */
3613     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3614
3615     switch (get_itbl(bqe)->type) {
3616     case TSO:
3617       fprintf(stderr," TSO %u (%x),",
3618               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3619       break;
3620     case BLOCKED_FETCH:
3621       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3622               ((StgBlockedFetch *)bqe)->node, 
3623               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3624               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3625               ((StgBlockedFetch *)bqe)->ga.weight);
3626       break;
3627     case CONSTR:
3628       fprintf(stderr," %s (IP %p),",
3629               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3630                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3631                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3632                "RBH_Save_?"), get_itbl(bqe));
3633       break;
3634     default:
3635       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3636            info_type((StgClosure *)bqe)); // , node, info_type(node));
3637       break;
3638     }
3639   } /* for */
3640   fputc('\n', stderr);
3641 }
3642 # elif defined(GRAN)
3643 void 
3644 print_bq (StgClosure *node)
3645 {
3646   StgBlockingQueueElement *bqe;
3647   PEs node_loc, tso_loc;
3648   rtsBool end;
3649
3650   /* should cover all closures that may have a blocking queue */
3651   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3652          get_itbl(node)->type == FETCH_ME_BQ ||
3653          get_itbl(node)->type == RBH);
3654     
3655   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3656   node_loc = where_is(node);
3657
3658   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3659           node, info_type(node), node_loc);
3660
3661   /* 
3662      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3663   */
3664   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3665        !end; // iterate until bqe points to a CONSTR
3666        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3667     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3668     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3669     /* types of closures that may appear in a blocking queue */
3670     ASSERT(get_itbl(bqe)->type == TSO ||           
3671            get_itbl(bqe)->type == CONSTR); 
3672     /* only BQs of an RBH end with an RBH_Save closure */
3673     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3674
3675     tso_loc = where_is((StgClosure *)bqe);
3676     switch (get_itbl(bqe)->type) {
3677     case TSO:
3678       fprintf(stderr," TSO %d (%p) on [PE %d],",
3679               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3680       break;
3681     case CONSTR:
3682       fprintf(stderr," %s (IP %p),",
3683               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3684                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3685                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3686                "RBH_Save_?"), get_itbl(bqe));
3687       break;
3688     default:
3689       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3690            info_type((StgClosure *)bqe), node, info_type(node));
3691       break;
3692     }
3693   } /* for */
3694   fputc('\n', stderr);
3695 }
3696 #else
3697 /* 
3698    Nice and easy: only TSOs on the blocking queue
3699 */
3700 void 
3701 print_bq (StgClosure *node)
3702 {
3703   StgTSO *tso;
3704
3705   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3706   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3707        tso != END_TSO_QUEUE; 
3708        tso=tso->link) {
3709     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3710     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3711     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3712   }
3713   fputc('\n', stderr);
3714 }
3715 # endif
3716
3717 #if defined(PAR)
3718 static nat
3719 run_queue_len(void)
3720 {
3721   nat i;
3722   StgTSO *tso;
3723
3724   for (i=0, tso=run_queue_hd; 
3725        tso != END_TSO_QUEUE;
3726        i++, tso=tso->link)
3727     /* nothing */
3728
3729   return i;
3730 }
3731 #endif
3732
3733 static void
3734 sched_belch(char *s, ...)
3735 {
3736   va_list ap;
3737   va_start(ap,s);
3738 #ifdef SMP
3739   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3740 #elif defined(PAR)
3741   fprintf(stderr, "== ");
3742 #else
3743   fprintf(stderr, "scheduler: ");
3744 #endif
3745   vfprintf(stderr, s, ap);
3746   fprintf(stderr, "\n");
3747 }
3748
3749 #endif /* DEBUG */
3750
3751
3752 //@node Index,  , Debugging Routines, Main scheduling code
3753 //@subsection Index
3754
3755 //@index
3756 //* StgMainThread::  @cindex\s-+StgMainThread
3757 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3758 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3759 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3760 //* context_switch::  @cindex\s-+context_switch
3761 //* createThread::  @cindex\s-+createThread
3762 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3763 //* initScheduler::  @cindex\s-+initScheduler
3764 //* interrupted::  @cindex\s-+interrupted
3765 //* next_thread_id::  @cindex\s-+next_thread_id
3766 //* print_bq::  @cindex\s-+print_bq
3767 //* run_queue_hd::  @cindex\s-+run_queue_hd
3768 //* run_queue_tl::  @cindex\s-+run_queue_tl
3769 //* sched_mutex::  @cindex\s-+sched_mutex
3770 //* schedule::  @cindex\s-+schedule
3771 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3772 //* term_mutex::  @cindex\s-+term_mutex
3773 //@end index