[project @ 2002-05-11 00:16:11 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.142 2002/05/11 00:16:11 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #ifdef HAVE_SYS_TYPES_H
118 #include <sys/types.h>
119 #endif
120 #ifdef HAVE_UNISTD_H
121 #include <unistd.h>
122 #endif
123
124 #include <stdarg.h>
125
126 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
127 //@subsection Variables and Data structures
128
129 /* Main thread queue.
130  * Locks required: sched_mutex.
131  */
132 StgMainThread *main_threads;
133
134 /* Thread queues.
135  * Locks required: sched_mutex.
136  */
137 #if defined(GRAN)
138
139 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
140 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
141
142 /* 
143    In GranSim we have a runnable and a blocked queue for each processor.
144    In order to minimise code changes new arrays run_queue_hds/tls
145    are created. run_queue_hd is then a short cut (macro) for
146    run_queue_hds[CurrentProc] (see GranSim.h).
147    -- HWL
148 */
149 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
150 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
151 StgTSO *ccalling_threadss[MAX_PROC];
152 /* We use the same global list of threads (all_threads) in GranSim as in
153    the std RTS (i.e. we are cheating). However, we don't use this list in
154    the GranSim specific code at the moment (so we are only potentially
155    cheating).  */
156
157 #else /* !GRAN */
158
159 StgTSO *run_queue_hd, *run_queue_tl;
160 StgTSO *blocked_queue_hd, *blocked_queue_tl;
161 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
162
163 #endif
164
165 /* Linked list of all threads.
166  * Used for detecting garbage collected threads.
167  */
168 StgTSO *all_threads;
169
170 /* When a thread performs a safe C call (_ccall_GC, using old
171  * terminology), it gets put on the suspended_ccalling_threads
172  * list. Used by the garbage collector.
173  */
174 static StgTSO *suspended_ccalling_threads;
175
176 static StgTSO *threadStackOverflow(StgTSO *tso);
177
178 /* KH: The following two flags are shared memory locations.  There is no need
179        to lock them, since they are only unset at the end of a scheduler
180        operation.
181 */
182
183 /* flag set by signal handler to precipitate a context switch */
184 //@cindex context_switch
185 nat context_switch;
186
187 /* if this flag is set as well, give up execution */
188 //@cindex interrupted
189 rtsBool interrupted;
190
191 /* Next thread ID to allocate.
192  * Locks required: sched_mutex
193  */
194 //@cindex next_thread_id
195 StgThreadID next_thread_id = 1;
196
197 /*
198  * Pointers to the state of the current thread.
199  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
200  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
201  */
202  
203 /* The smallest stack size that makes any sense is:
204  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
205  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
206  *  + 1                       (the realworld token for an IO thread)
207  *  + 1                       (the closure to enter)
208  *
209  * A thread with this stack will bomb immediately with a stack
210  * overflow, which will increase its stack size.  
211  */
212
213 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
214
215
216 #if defined(GRAN)
217 StgTSO *CurrentTSO;
218 #endif
219
220 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
221  *  exists - earlier gccs apparently didn't.
222  *  -= chak
223  */
224 StgTSO dummy_tso;
225
226 rtsBool ready_to_gc;
227 rtsBool shutting_down_scheduler = rtsFalse;
228
229 void            addToBlockedQueue ( StgTSO *tso );
230
231 static void     schedule          ( void );
232        void     interruptStgRts   ( void );
233 #if defined(GRAN)
234 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
235 #else
236 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
237 #endif
238
239 static void     detectBlackHoles  ( void );
240
241 #ifdef DEBUG
242 static void sched_belch(char *s, ...);
243 #endif
244
245 #if defined(RTS_SUPPORTS_THREADS)
246 /* ToDo: carefully document the invariants that go together
247  *       with these synchronisation objects.
248  */
249 Mutex     sched_mutex       = INIT_MUTEX_VAR;
250 Mutex     term_mutex        = INIT_MUTEX_VAR;
251
252 # if defined(SMP)
253 static Condition gc_pending_cond = INIT_COND_VAR;
254 nat await_death;
255 # endif
256
257 #endif /* RTS_SUPPORTS_THREADS */
258
259 #if defined(PAR)
260 StgTSO *LastTSO;
261 rtsTime TimeOfLastYield;
262 rtsBool emitSchedule = rtsTrue;
263 #endif
264
265 #if DEBUG
266 char *whatNext_strs[] = {
267   "ThreadEnterGHC",
268   "ThreadRunGHC",
269   "ThreadEnterInterp",
270   "ThreadKilled",
271   "ThreadComplete"
272 };
273
274 char *threadReturnCode_strs[] = {
275   "HeapOverflow",                       /* might also be StackOverflow */
276   "StackOverflow",
277   "ThreadYielding",
278   "ThreadBlocked",
279   "ThreadFinished"
280 };
281 #endif
282
283 #if defined(PAR)
284 StgTSO * createSparkThread(rtsSpark spark);
285 StgTSO * activateSpark (rtsSpark spark);  
286 #endif
287
288 /*
289  * The thread state for the main thread.
290 // ToDo: check whether not needed any more
291 StgTSO   *MainTSO;
292  */
293
294 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
295 static void taskStart(void);
296 static void
297 taskStart(void)
298 {
299   schedule();
300 }
301 #endif
302
303
304
305
306 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
307 //@subsection Main scheduling loop
308
309 /* ---------------------------------------------------------------------------
310    Main scheduling loop.
311
312    We use round-robin scheduling, each thread returning to the
313    scheduler loop when one of these conditions is detected:
314
315       * out of heap space
316       * timer expires (thread yields)
317       * thread blocks
318       * thread ends
319       * stack overflow
320
321    Locking notes:  we acquire the scheduler lock once at the beginning
322    of the scheduler loop, and release it when
323     
324       * running a thread, or
325       * waiting for work, or
326       * waiting for a GC to complete.
327
328    GRAN version:
329      In a GranSim setup this loop iterates over the global event queue.
330      This revolves around the global event queue, which determines what 
331      to do next. Therefore, it's more complicated than either the 
332      concurrent or the parallel (GUM) setup.
333
334    GUM version:
335      GUM iterates over incoming messages.
336      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
337      and sends out a fish whenever it has nothing to do; in-between
338      doing the actual reductions (shared code below) it processes the
339      incoming messages and deals with delayed operations 
340      (see PendingFetches).
341      This is not the ugliest code you could imagine, but it's bloody close.
342
343    ------------------------------------------------------------------------ */
344 //@cindex schedule
345 static void
346 schedule( void )
347 {
348   StgTSO *t;
349   Capability *cap;
350   StgThreadReturnCode ret;
351 #if defined(GRAN)
352   rtsEvent *event;
353 #elif defined(PAR)
354   StgSparkPool *pool;
355   rtsSpark spark;
356   StgTSO *tso;
357   GlobalTaskId pe;
358   rtsBool receivedFinish = rtsFalse;
359 # if defined(DEBUG)
360   nat tp_size, sp_size; // stats only
361 # endif
362 #endif
363   rtsBool was_interrupted = rtsFalse;
364   
365   ACQUIRE_LOCK(&sched_mutex);
366  
367 #if defined(RTS_SUPPORTS_THREADS)
368   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
369 #else
370   /* simply initialise it in the non-threaded case */
371   grabCapability(&cap);
372 #endif
373
374 #if defined(GRAN)
375   /* set up first event to get things going */
376   /* ToDo: assign costs for system setup and init MainTSO ! */
377   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
378             ContinueThread, 
379             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
380
381   IF_DEBUG(gran,
382            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
383            G_TSO(CurrentTSO, 5));
384
385   if (RtsFlags.GranFlags.Light) {
386     /* Save current time; GranSim Light only */
387     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
388   }      
389
390   event = get_next_event();
391
392   while (event!=(rtsEvent*)NULL) {
393     /* Choose the processor with the next event */
394     CurrentProc = event->proc;
395     CurrentTSO = event->tso;
396
397 #elif defined(PAR)
398
399   while (!receivedFinish) {    /* set by processMessages */
400                                /* when receiving PP_FINISH message         */ 
401 #else
402
403   while (1) {
404
405 #endif
406
407     IF_DEBUG(scheduler, printAllThreads());
408
409 #if defined(RTS_SUPPORTS_THREADS)
410     /* Check to see whether there are any worker threads
411        waiting to deposit external call results. If so,
412        yield our capability */
413     yieldToReturningWorker(&sched_mutex, &cap);
414 #endif
415
416     /* If we're interrupted (the user pressed ^C, or some other
417      * termination condition occurred), kill all the currently running
418      * threads.
419      */
420     if (interrupted) {
421       IF_DEBUG(scheduler, sched_belch("interrupted"));
422       deleteAllThreads();
423       interrupted = rtsFalse;
424       was_interrupted = rtsTrue;
425     }
426
427     /* Go through the list of main threads and wake up any
428      * clients whose computations have finished.  ToDo: this
429      * should be done more efficiently without a linear scan
430      * of the main threads list, somehow...
431      */
432 #if defined(RTS_SUPPORTS_THREADS)
433     { 
434       StgMainThread *m, **prev;
435       prev = &main_threads;
436       for (m = main_threads; m != NULL; m = m->link) {
437         switch (m->tso->what_next) {
438         case ThreadComplete:
439           if (m->ret) {
440             *(m->ret) = (StgClosure *)m->tso->sp[0];
441           }
442           *prev = m->link;
443           m->stat = Success;
444           broadcastCondition(&m->wakeup);
445 #ifdef DEBUG
446           free(m->tso->label);
447           m->tso->label = NULL;
448 #endif
449           break;
450         case ThreadKilled:
451           if (m->ret) *(m->ret) = NULL;
452           *prev = m->link;
453           if (was_interrupted) {
454             m->stat = Interrupted;
455           } else {
456             m->stat = Killed;
457           }
458           broadcastCondition(&m->wakeup);
459 #ifdef DEBUG
460           free(m->tso->label);
461           m->tso->label = NULL;
462 #endif
463           break;
464         default:
465           break;
466         }
467       }
468     }
469
470 #else /* not threaded */
471
472 # if defined(PAR)
473     /* in GUM do this only on the Main PE */
474     if (IAmMainThread)
475 # endif
476     /* If our main thread has finished or been killed, return.
477      */
478     {
479       StgMainThread *m = main_threads;
480       if (m->tso->what_next == ThreadComplete
481           || m->tso->what_next == ThreadKilled) {
482 #ifdef DEBUG
483         free(m->tso->label);
484         m->tso->label = NULL;
485 #endif
486         main_threads = main_threads->link;
487         if (m->tso->what_next == ThreadComplete) {
488           /* we finished successfully, fill in the return value */
489           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
490           m->stat = Success;
491           return;
492         } else {
493           if (m->ret) { *(m->ret) = NULL; };
494           if (was_interrupted) {
495             m->stat = Interrupted;
496           } else {
497             m->stat = Killed;
498           }
499           return;
500         }
501       }
502     }
503 #endif
504
505     /* Top up the run queue from our spark pool.  We try to make the
506      * number of threads in the run queue equal to the number of
507      * free capabilities.
508      *
509      * Disable spark support in SMP for now, non-essential & requires
510      * a little bit of work to make it compile cleanly. -- sof 1/02.
511      */
512 #if 0 /* defined(SMP) */
513     {
514       nat n = getFreeCapabilities();
515       StgTSO *tso = run_queue_hd;
516
517       /* Count the run queue */
518       while (n > 0 && tso != END_TSO_QUEUE) {
519         tso = tso->link;
520         n--;
521       }
522
523       for (; n > 0; n--) {
524         StgClosure *spark;
525         spark = findSpark(rtsFalse);
526         if (spark == NULL) {
527           break; /* no more sparks in the pool */
528         } else {
529           /* I'd prefer this to be done in activateSpark -- HWL */
530           /* tricky - it needs to hold the scheduler lock and
531            * not try to re-acquire it -- SDM */
532           createSparkThread(spark);       
533           IF_DEBUG(scheduler,
534                    sched_belch("==^^ turning spark of closure %p into a thread",
535                                (StgClosure *)spark));
536         }
537       }
538       /* We need to wake up the other tasks if we just created some
539        * work for them.
540        */
541       if (getFreeCapabilities() - n > 1) {
542           signalCondition( &thread_ready_cond );
543       }
544     }
545 #endif // SMP
546
547     /* check for signals each time around the scheduler */
548 #ifndef mingw32_TARGET_OS
549     if (signals_pending()) {
550       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
551       startSignalHandlers();
552       ACQUIRE_LOCK(&sched_mutex);
553     }
554 #endif
555
556     /* Check whether any waiting threads need to be woken up.  If the
557      * run queue is empty, and there are no other tasks running, we
558      * can wait indefinitely for something to happen.
559      * ToDo: what if another client comes along & requests another
560      * main thread?
561      */
562     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
563       awaitEvent( EMPTY_RUN_QUEUE()
564 #if defined(SMP)
565         && allFreeCapabilities()
566 #endif
567         );
568     }
569     /* we can be interrupted while waiting for I/O... */
570     if (interrupted) continue;
571
572     /* 
573      * Detect deadlock: when we have no threads to run, there are no
574      * threads waiting on I/O or sleeping, and all the other tasks are
575      * waiting for work, we must have a deadlock of some description.
576      *
577      * We first try to find threads blocked on themselves (ie. black
578      * holes), and generate NonTermination exceptions where necessary.
579      *
580      * If no threads are black holed, we have a deadlock situation, so
581      * inform all the main threads.
582      */
583 #ifndef PAR
584     if (   EMPTY_THREAD_QUEUES()
585 #if defined(RTS_SUPPORTS_THREADS)
586         && EMPTY_QUEUE(suspended_ccalling_threads)
587 #endif
588 #ifdef SMP
589         && allFreeCapabilities()
590 #endif
591         )
592     {
593         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
594 #if defined(THREADED_RTS)
595         /* and SMP mode ..? */
596         releaseCapability(cap);
597 #endif
598         // Garbage collection can release some new threads due to
599         // either (a) finalizers or (b) threads resurrected because
600         // they are about to be send BlockedOnDeadMVar.  Any threads
601         // thus released will be immediately runnable.
602         GarbageCollect(GetRoots,rtsTrue);
603
604         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
605
606         IF_DEBUG(scheduler, 
607                  sched_belch("still deadlocked, checking for black holes..."));
608         detectBlackHoles();
609
610         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
611
612 #ifndef mingw32_TARGET_OS
613         /* If we have user-installed signal handlers, then wait
614          * for signals to arrive rather then bombing out with a
615          * deadlock.
616          */
617 #if defined(RTS_SUPPORTS_THREADS)
618         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
619                       a signal with no runnable threads (or I/O
620                       suspended ones) leads nowhere quick.
621                       For now, simply shut down when we reach this
622                       condition.
623                       
624                       ToDo: define precisely under what conditions
625                       the Scheduler should shut down in an MT setting.
626                    */
627 #else
628         if ( anyUserHandlers() ) {
629 #endif
630             IF_DEBUG(scheduler, 
631                      sched_belch("still deadlocked, waiting for signals..."));
632
633             awaitUserSignals();
634
635             // we might be interrupted...
636             if (interrupted) { continue; }
637
638             if (signals_pending()) {
639                 RELEASE_LOCK(&sched_mutex);
640                 startSignalHandlers();
641                 ACQUIRE_LOCK(&sched_mutex);
642             }
643             ASSERT(!EMPTY_RUN_QUEUE());
644             goto not_deadlocked;
645         }
646 #endif
647
648         /* Probably a real deadlock.  Send the current main thread the
649          * Deadlock exception (or in the SMP build, send *all* main
650          * threads the deadlock exception, since none of them can make
651          * progress).
652          */
653         {
654             StgMainThread *m;
655 #if defined(RTS_SUPPORTS_THREADS)
656             for (m = main_threads; m != NULL; m = m->link) {
657                 switch (m->tso->why_blocked) {
658                 case BlockedOnBlackHole:
659                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
660                     break;
661                 case BlockedOnException:
662                 case BlockedOnMVar:
663                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
664                     break;
665                 default:
666                     barf("deadlock: main thread blocked in a strange way");
667                 }
668             }
669 #else
670             m = main_threads;
671             switch (m->tso->why_blocked) {
672             case BlockedOnBlackHole:
673                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
674                 break;
675             case BlockedOnException:
676             case BlockedOnMVar:
677                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
678                 break;
679             default:
680                 barf("deadlock: main thread blocked in a strange way");
681             }
682 #endif
683         }
684
685 #if defined(RTS_SUPPORTS_THREADS)
686         /* ToDo: revisit conditions (and mechanism) for shutting
687            down a multi-threaded world  */
688         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
689         RELEASE_LOCK(&sched_mutex);
690         shutdownHaskell();
691         return;
692 #endif
693     }
694   not_deadlocked:
695
696 #elif defined(PAR)
697     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
698 #endif
699
700 #if defined(SMP)
701     /* If there's a GC pending, don't do anything until it has
702      * completed.
703      */
704     if (ready_to_gc) {
705       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
706       waitCondition( &gc_pending_cond, &sched_mutex );
707     }
708 #endif    
709
710 #if defined(RTS_SUPPORTS_THREADS)
711     /* block until we've got a thread on the run queue and a free
712      * capability.
713      *
714      */
715     if ( EMPTY_RUN_QUEUE() ) {
716       /* Give up our capability */
717       releaseCapability(cap);
718
719       /* If we're in the process of shutting down (& running the
720        * a batch of finalisers), don't wait around.
721        */
722       if ( shutting_down_scheduler ) {
723         RELEASE_LOCK(&sched_mutex);
724         return;
725       }
726       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
727       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
728       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
729     }
730 #endif
731
732 #if defined(GRAN)
733     if (RtsFlags.GranFlags.Light)
734       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
735
736     /* adjust time based on time-stamp */
737     if (event->time > CurrentTime[CurrentProc] &&
738         event->evttype != ContinueThread)
739       CurrentTime[CurrentProc] = event->time;
740     
741     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
742     if (!RtsFlags.GranFlags.Light)
743       handleIdlePEs();
744
745     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
746
747     /* main event dispatcher in GranSim */
748     switch (event->evttype) {
749       /* Should just be continuing execution */
750     case ContinueThread:
751       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
752       /* ToDo: check assertion
753       ASSERT(run_queue_hd != (StgTSO*)NULL &&
754              run_queue_hd != END_TSO_QUEUE);
755       */
756       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
757       if (!RtsFlags.GranFlags.DoAsyncFetch &&
758           procStatus[CurrentProc]==Fetching) {
759         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
760               CurrentTSO->id, CurrentTSO, CurrentProc);
761         goto next_thread;
762       } 
763       /* Ignore ContinueThreads for completed threads */
764       if (CurrentTSO->what_next == ThreadComplete) {
765         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
766               CurrentTSO->id, CurrentTSO, CurrentProc);
767         goto next_thread;
768       } 
769       /* Ignore ContinueThreads for threads that are being migrated */
770       if (PROCS(CurrentTSO)==Nowhere) { 
771         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
772               CurrentTSO->id, CurrentTSO, CurrentProc);
773         goto next_thread;
774       }
775       /* The thread should be at the beginning of the run queue */
776       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
777         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
778               CurrentTSO->id, CurrentTSO, CurrentProc);
779         break; // run the thread anyway
780       }
781       /*
782       new_event(proc, proc, CurrentTime[proc],
783                 FindWork,
784                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
785       goto next_thread; 
786       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
787       break; // now actually run the thread; DaH Qu'vam yImuHbej 
788
789     case FetchNode:
790       do_the_fetchnode(event);
791       goto next_thread;             /* handle next event in event queue  */
792       
793     case GlobalBlock:
794       do_the_globalblock(event);
795       goto next_thread;             /* handle next event in event queue  */
796       
797     case FetchReply:
798       do_the_fetchreply(event);
799       goto next_thread;             /* handle next event in event queue  */
800       
801     case UnblockThread:   /* Move from the blocked queue to the tail of */
802       do_the_unblock(event);
803       goto next_thread;             /* handle next event in event queue  */
804       
805     case ResumeThread:  /* Move from the blocked queue to the tail of */
806       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
807       event->tso->gran.blocktime += 
808         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
809       do_the_startthread(event);
810       goto next_thread;             /* handle next event in event queue  */
811       
812     case StartThread:
813       do_the_startthread(event);
814       goto next_thread;             /* handle next event in event queue  */
815       
816     case MoveThread:
817       do_the_movethread(event);
818       goto next_thread;             /* handle next event in event queue  */
819       
820     case MoveSpark:
821       do_the_movespark(event);
822       goto next_thread;             /* handle next event in event queue  */
823       
824     case FindWork:
825       do_the_findwork(event);
826       goto next_thread;             /* handle next event in event queue  */
827       
828     default:
829       barf("Illegal event type %u\n", event->evttype);
830     }  /* switch */
831     
832     /* This point was scheduler_loop in the old RTS */
833
834     IF_DEBUG(gran, belch("GRAN: after main switch"));
835
836     TimeOfLastEvent = CurrentTime[CurrentProc];
837     TimeOfNextEvent = get_time_of_next_event();
838     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
839     // CurrentTSO = ThreadQueueHd;
840
841     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
842                          TimeOfNextEvent));
843
844     if (RtsFlags.GranFlags.Light) 
845       GranSimLight_leave_system(event, &ActiveTSO); 
846
847     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
848
849     IF_DEBUG(gran, 
850              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
851
852     /* in a GranSim setup the TSO stays on the run queue */
853     t = CurrentTSO;
854     /* Take a thread from the run queue. */
855     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
856
857     IF_DEBUG(gran, 
858              fprintf(stderr, "GRAN: About to run current thread, which is\n");
859              G_TSO(t,5));
860
861     context_switch = 0; // turned on via GranYield, checking events and time slice
862
863     IF_DEBUG(gran, 
864              DumpGranEvent(GR_SCHEDULE, t));
865
866     procStatus[CurrentProc] = Busy;
867
868 #elif defined(PAR)
869     if (PendingFetches != END_BF_QUEUE) {
870         processFetches();
871     }
872
873     /* ToDo: phps merge with spark activation above */
874     /* check whether we have local work and send requests if we have none */
875     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
876       /* :-[  no local threads => look out for local sparks */
877       /* the spark pool for the current PE */
878       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
879       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
880           pool->hd < pool->tl) {
881         /* 
882          * ToDo: add GC code check that we really have enough heap afterwards!!
883          * Old comment:
884          * If we're here (no runnable threads) and we have pending
885          * sparks, we must have a space problem.  Get enough space
886          * to turn one of those pending sparks into a
887          * thread... 
888          */
889
890         spark = findSpark(rtsFalse);                /* get a spark */
891         if (spark != (rtsSpark) NULL) {
892           tso = activateSpark(spark);       /* turn the spark into a thread */
893           IF_PAR_DEBUG(schedule,
894                        belch("==== schedule: Created TSO %d (%p); %d threads active",
895                              tso->id, tso, advisory_thread_count));
896
897           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
898             belch("==^^ failed to activate spark");
899             goto next_thread;
900           }               /* otherwise fall through & pick-up new tso */
901         } else {
902           IF_PAR_DEBUG(verbose,
903                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
904                              spark_queue_len(pool)));
905           goto next_thread;
906         }
907       }
908
909       /* If we still have no work we need to send a FISH to get a spark
910          from another PE 
911       */
912       if (EMPTY_RUN_QUEUE()) {
913       /* =8-[  no local sparks => look for work on other PEs */
914         /*
915          * We really have absolutely no work.  Send out a fish
916          * (there may be some out there already), and wait for
917          * something to arrive.  We clearly can't run any threads
918          * until a SCHEDULE or RESUME arrives, and so that's what
919          * we're hoping to see.  (Of course, we still have to
920          * respond to other types of messages.)
921          */
922         TIME now = msTime() /*CURRENT_TIME*/;
923         IF_PAR_DEBUG(verbose, 
924                      belch("--  now=%ld", now));
925         IF_PAR_DEBUG(verbose,
926                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
927                          (last_fish_arrived_at!=0 &&
928                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
929                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
930                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
931                              last_fish_arrived_at,
932                              RtsFlags.ParFlags.fishDelay, now);
933                      });
934         
935         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
936             (last_fish_arrived_at==0 ||
937              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
938           /* outstandingFishes is set in sendFish, processFish;
939              avoid flooding system with fishes via delay */
940           pe = choosePE();
941           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
942                    NEW_FISH_HUNGER);
943
944           // Global statistics: count no. of fishes
945           if (RtsFlags.ParFlags.ParStats.Global &&
946               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
947             globalParStats.tot_fish_mess++;
948           }
949         }
950       
951         receivedFinish = processMessages();
952         goto next_thread;
953       }
954     } else if (PacketsWaiting()) {  /* Look for incoming messages */
955       receivedFinish = processMessages();
956     }
957
958     /* Now we are sure that we have some work available */
959     ASSERT(run_queue_hd != END_TSO_QUEUE);
960
961     /* Take a thread from the run queue, if we have work */
962     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
963     IF_DEBUG(sanity,checkTSO(t));
964
965     /* ToDo: write something to the log-file
966     if (RTSflags.ParFlags.granSimStats && !sameThread)
967         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
968
969     CurrentTSO = t;
970     */
971     /* the spark pool for the current PE */
972     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
973
974     IF_DEBUG(scheduler, 
975              belch("--=^ %d threads, %d sparks on [%#x]", 
976                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
977
978 # if 1
979     if (0 && RtsFlags.ParFlags.ParStats.Full && 
980         t && LastTSO && t->id != LastTSO->id && 
981         LastTSO->why_blocked == NotBlocked && 
982         LastTSO->what_next != ThreadComplete) {
983       // if previously scheduled TSO not blocked we have to record the context switch
984       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
985                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
986     }
987
988     if (RtsFlags.ParFlags.ParStats.Full && 
989         (emitSchedule /* forced emit */ ||
990         (t && LastTSO && t->id != LastTSO->id))) {
991       /* 
992          we are running a different TSO, so write a schedule event to log file
993          NB: If we use fair scheduling we also have to write  a deschedule 
994              event for LastTSO; with unfair scheduling we know that the
995              previous tso has blocked whenever we switch to another tso, so
996              we don't need it in GUM for now
997       */
998       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
999                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1000       emitSchedule = rtsFalse;
1001     }
1002      
1003 # endif
1004 #else /* !GRAN && !PAR */
1005   
1006     /* grab a thread from the run queue */
1007     ASSERT(run_queue_hd != END_TSO_QUEUE);
1008     t = POP_RUN_QUEUE();
1009     // Sanity check the thread we're about to run.  This can be
1010     // expensive if there is lots of thread switching going on...
1011     IF_DEBUG(sanity,checkTSO(t));
1012 #endif
1013     
1014     cap->r.rCurrentTSO = t;
1015     
1016     /* context switches are now initiated by the timer signal, unless
1017      * the user specified "context switch as often as possible", with
1018      * +RTS -C0
1019      */
1020     if (
1021 #ifdef PROFILING
1022         RtsFlags.ProfFlags.profileInterval == 0 ||
1023 #endif
1024         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1025          && (run_queue_hd != END_TSO_QUEUE
1026              || blocked_queue_hd != END_TSO_QUEUE
1027              || sleeping_queue != END_TSO_QUEUE)))
1028         context_switch = 1;
1029     else
1030         context_switch = 0;
1031
1032     RELEASE_LOCK(&sched_mutex);
1033
1034     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1035                               t->id, t, whatNext_strs[t->what_next]));
1036
1037 #ifdef PROFILING
1038     startHeapProfTimer();
1039 #endif
1040
1041     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1042     /* Run the current thread 
1043      */
1044     switch (cap->r.rCurrentTSO->what_next) {
1045     case ThreadKilled:
1046     case ThreadComplete:
1047         /* Thread already finished, return to scheduler. */
1048         ret = ThreadFinished;
1049         break;
1050     case ThreadEnterGHC:
1051         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1052         break;
1053     case ThreadRunGHC:
1054         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1055         break;
1056     case ThreadEnterInterp:
1057         ret = interpretBCO(cap);
1058         break;
1059     default:
1060       barf("schedule: invalid what_next field");
1061     }
1062     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1063     
1064     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1065 #ifdef PROFILING
1066     stopHeapProfTimer();
1067     CCCS = CCS_SYSTEM;
1068 #endif
1069     
1070     ACQUIRE_LOCK(&sched_mutex);
1071
1072 #ifdef SMP
1073     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1074 #elif !defined(GRAN) && !defined(PAR)
1075     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1076 #endif
1077     t = cap->r.rCurrentTSO;
1078     
1079 #if defined(PAR)
1080     /* HACK 675: if the last thread didn't yield, make sure to print a 
1081        SCHEDULE event to the log file when StgRunning the next thread, even
1082        if it is the same one as before */
1083     LastTSO = t; 
1084     TimeOfLastYield = CURRENT_TIME;
1085 #endif
1086
1087     switch (ret) {
1088     case HeapOverflow:
1089 #if defined(GRAN)
1090       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1091       globalGranStats.tot_heapover++;
1092 #elif defined(PAR)
1093       globalParStats.tot_heapover++;
1094 #endif
1095
1096       // did the task ask for a large block?
1097       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1098           // if so, get one and push it on the front of the nursery.
1099           bdescr *bd;
1100           nat blocks;
1101           
1102           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1103
1104           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1105                                    t->id, t,
1106                                    whatNext_strs[t->what_next], blocks));
1107
1108           // don't do this if it would push us over the
1109           // alloc_blocks_lim limit; we'll GC first.
1110           if (alloc_blocks + blocks < alloc_blocks_lim) {
1111
1112               alloc_blocks += blocks;
1113               bd = allocGroup( blocks );
1114
1115               // link the new group into the list
1116               bd->link = cap->r.rCurrentNursery;
1117               bd->u.back = cap->r.rCurrentNursery->u.back;
1118               if (cap->r.rCurrentNursery->u.back != NULL) {
1119                   cap->r.rCurrentNursery->u.back->link = bd;
1120               } else {
1121                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1122                          g0s0->blocks == cap->r.rNursery);
1123                   cap->r.rNursery = g0s0->blocks = bd;
1124               }           
1125               cap->r.rCurrentNursery->u.back = bd;
1126
1127               // initialise it as a nursery block
1128               bd->step = g0s0;
1129               bd->gen_no = 0;
1130               bd->flags = 0;
1131               bd->free = bd->start;
1132
1133               // don't forget to update the block count in g0s0.
1134               g0s0->n_blocks += blocks;
1135               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1136
1137               // now update the nursery to point to the new block
1138               cap->r.rCurrentNursery = bd;
1139
1140               // we might be unlucky and have another thread get on the
1141               // run queue before us and steal the large block, but in that
1142               // case the thread will just end up requesting another large
1143               // block.
1144               PUSH_ON_RUN_QUEUE(t);
1145               break;
1146           }
1147       }
1148
1149       /* make all the running tasks block on a condition variable,
1150        * maybe set context_switch and wait till they all pile in,
1151        * then have them wait on a GC condition variable.
1152        */
1153       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1154                                t->id, t, whatNext_strs[t->what_next]));
1155       threadPaused(t);
1156 #if defined(GRAN)
1157       ASSERT(!is_on_queue(t,CurrentProc));
1158 #elif defined(PAR)
1159       /* Currently we emit a DESCHEDULE event before GC in GUM.
1160          ToDo: either add separate event to distinguish SYSTEM time from rest
1161                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1162       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1163         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1164                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1165         emitSchedule = rtsTrue;
1166       }
1167 #endif
1168       
1169       ready_to_gc = rtsTrue;
1170       context_switch = 1;               /* stop other threads ASAP */
1171       PUSH_ON_RUN_QUEUE(t);
1172       /* actual GC is done at the end of the while loop */
1173       break;
1174       
1175     case StackOverflow:
1176 #if defined(GRAN)
1177       IF_DEBUG(gran, 
1178                DumpGranEvent(GR_DESCHEDULE, t));
1179       globalGranStats.tot_stackover++;
1180 #elif defined(PAR)
1181       // IF_DEBUG(par, 
1182       // DumpGranEvent(GR_DESCHEDULE, t);
1183       globalParStats.tot_stackover++;
1184 #endif
1185       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1186                                t->id, t, whatNext_strs[t->what_next]));
1187       /* just adjust the stack for this thread, then pop it back
1188        * on the run queue.
1189        */
1190       threadPaused(t);
1191       { 
1192         StgMainThread *m;
1193         /* enlarge the stack */
1194         StgTSO *new_t = threadStackOverflow(t);
1195         
1196         /* This TSO has moved, so update any pointers to it from the
1197          * main thread stack.  It better not be on any other queues...
1198          * (it shouldn't be).
1199          */
1200         for (m = main_threads; m != NULL; m = m->link) {
1201           if (m->tso == t) {
1202             m->tso = new_t;
1203           }
1204         }
1205         threadPaused(new_t);
1206         PUSH_ON_RUN_QUEUE(new_t);
1207       }
1208       break;
1209
1210     case ThreadYielding:
1211 #if defined(GRAN)
1212       IF_DEBUG(gran, 
1213                DumpGranEvent(GR_DESCHEDULE, t));
1214       globalGranStats.tot_yields++;
1215 #elif defined(PAR)
1216       // IF_DEBUG(par, 
1217       // DumpGranEvent(GR_DESCHEDULE, t);
1218       globalParStats.tot_yields++;
1219 #endif
1220       /* put the thread back on the run queue.  Then, if we're ready to
1221        * GC, check whether this is the last task to stop.  If so, wake
1222        * up the GC thread.  getThread will block during a GC until the
1223        * GC is finished.
1224        */
1225       IF_DEBUG(scheduler,
1226                if (t->what_next == ThreadEnterInterp) {
1227                    /* ToDo: or maybe a timer expired when we were in Hugs?
1228                     * or maybe someone hit ctrl-C
1229                     */
1230                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1231                          t->id, t, whatNext_strs[t->what_next]);
1232                } else {
1233                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1234                          t->id, t, whatNext_strs[t->what_next]);
1235                }
1236                );
1237
1238       threadPaused(t);
1239
1240       IF_DEBUG(sanity,
1241                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1242                checkTSO(t));
1243       ASSERT(t->link == END_TSO_QUEUE);
1244 #if defined(GRAN)
1245       ASSERT(!is_on_queue(t,CurrentProc));
1246
1247       IF_DEBUG(sanity,
1248                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1249                checkThreadQsSanity(rtsTrue));
1250 #endif
1251 #if defined(PAR)
1252       if (RtsFlags.ParFlags.doFairScheduling) { 
1253         /* this does round-robin scheduling; good for concurrency */
1254         APPEND_TO_RUN_QUEUE(t);
1255       } else {
1256         /* this does unfair scheduling; good for parallelism */
1257         PUSH_ON_RUN_QUEUE(t);
1258       }
1259 #else
1260       /* this does round-robin scheduling; good for concurrency */
1261       APPEND_TO_RUN_QUEUE(t);
1262 #endif
1263 #if defined(GRAN)
1264       /* add a ContinueThread event to actually process the thread */
1265       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1266                 ContinueThread,
1267                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1268       IF_GRAN_DEBUG(bq, 
1269                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1270                G_EVENTQ(0);
1271                G_CURR_THREADQ(0));
1272 #endif /* GRAN */
1273       break;
1274       
1275     case ThreadBlocked:
1276 #if defined(GRAN)
1277       IF_DEBUG(scheduler,
1278                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1279                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1280                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1281
1282       // ??? needed; should emit block before
1283       IF_DEBUG(gran, 
1284                DumpGranEvent(GR_DESCHEDULE, t)); 
1285       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1286       /*
1287         ngoq Dogh!
1288       ASSERT(procStatus[CurrentProc]==Busy || 
1289               ((procStatus[CurrentProc]==Fetching) && 
1290               (t->block_info.closure!=(StgClosure*)NULL)));
1291       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1292           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1293             procStatus[CurrentProc]==Fetching)) 
1294         procStatus[CurrentProc] = Idle;
1295       */
1296 #elif defined(PAR)
1297       IF_DEBUG(scheduler,
1298                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1299                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1300       IF_PAR_DEBUG(bq,
1301
1302                    if (t->block_info.closure!=(StgClosure*)NULL) 
1303                      print_bq(t->block_info.closure));
1304
1305       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1306       blockThread(t);
1307
1308       /* whatever we schedule next, we must log that schedule */
1309       emitSchedule = rtsTrue;
1310
1311 #else /* !GRAN */
1312       /* don't need to do anything.  Either the thread is blocked on
1313        * I/O, in which case we'll have called addToBlockedQueue
1314        * previously, or it's blocked on an MVar or Blackhole, in which
1315        * case it'll be on the relevant queue already.
1316        */
1317       IF_DEBUG(scheduler,
1318                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1319                printThreadBlockage(t);
1320                fprintf(stderr, "\n"));
1321
1322       /* Only for dumping event to log file 
1323          ToDo: do I need this in GranSim, too?
1324       blockThread(t);
1325       */
1326 #endif
1327       threadPaused(t);
1328       break;
1329       
1330     case ThreadFinished:
1331       /* Need to check whether this was a main thread, and if so, signal
1332        * the task that started it with the return value.  If we have no
1333        * more main threads, we probably need to stop all the tasks until
1334        * we get a new one.
1335        */
1336       /* We also end up here if the thread kills itself with an
1337        * uncaught exception, see Exception.hc.
1338        */
1339       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1340 #if defined(GRAN)
1341       endThread(t, CurrentProc); // clean-up the thread
1342 #elif defined(PAR)
1343       /* For now all are advisory -- HWL */
1344       //if(t->priority==AdvisoryPriority) ??
1345       advisory_thread_count--;
1346       
1347 # ifdef DIST
1348       if(t->dist.priority==RevalPriority)
1349         FinishReval(t);
1350 # endif
1351       
1352       if (RtsFlags.ParFlags.ParStats.Full &&
1353           !RtsFlags.ParFlags.ParStats.Suppressed) 
1354         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1355 #endif
1356       break;
1357       
1358     default:
1359       barf("schedule: invalid thread return code %d", (int)ret);
1360     }
1361
1362 #ifdef PROFILING
1363     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1364         GarbageCollect(GetRoots, rtsTrue);
1365         heapCensus();
1366         performHeapProfile = rtsFalse;
1367         ready_to_gc = rtsFalse; // we already GC'd
1368     }
1369 #endif
1370
1371     if (ready_to_gc 
1372 #ifdef SMP
1373         && allFreeCapabilities() 
1374 #endif
1375         ) {
1376       /* everybody back, start the GC.
1377        * Could do it in this thread, or signal a condition var
1378        * to do it in another thread.  Either way, we need to
1379        * broadcast on gc_pending_cond afterward.
1380        */
1381 #if defined(RTS_SUPPORTS_THREADS)
1382       IF_DEBUG(scheduler,sched_belch("doing GC"));
1383 #endif
1384       GarbageCollect(GetRoots,rtsFalse);
1385       ready_to_gc = rtsFalse;
1386 #ifdef SMP
1387       broadcastCondition(&gc_pending_cond);
1388 #endif
1389 #if defined(GRAN)
1390       /* add a ContinueThread event to continue execution of current thread */
1391       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1392                 ContinueThread,
1393                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1394       IF_GRAN_DEBUG(bq, 
1395                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1396                G_EVENTQ(0);
1397                G_CURR_THREADQ(0));
1398 #endif /* GRAN */
1399     }
1400
1401 #if defined(GRAN)
1402   next_thread:
1403     IF_GRAN_DEBUG(unused,
1404                   print_eventq(EventHd));
1405
1406     event = get_next_event();
1407 #elif defined(PAR)
1408   next_thread:
1409     /* ToDo: wait for next message to arrive rather than busy wait */
1410 #endif /* GRAN */
1411
1412   } /* end of while(1) */
1413
1414   IF_PAR_DEBUG(verbose,
1415                belch("== Leaving schedule() after having received Finish"));
1416 }
1417
1418 /* ---------------------------------------------------------------------------
1419  * Singleton fork(). Do not copy any running threads.
1420  * ------------------------------------------------------------------------- */
1421
1422 StgInt forkProcess(StgTSO* tso) {
1423
1424 #ifndef mingw32_TARGET_OS
1425   pid_t pid;
1426   StgTSO* t,*next;
1427
1428   IF_DEBUG(scheduler,sched_belch("forking!"));
1429
1430   pid = fork();
1431   if (pid) { /* parent */
1432
1433   /* just return the pid */
1434     
1435   } else { /* child */
1436   /* wipe all other threads */
1437   run_queue_hd = tso;
1438   tso->link = END_TSO_QUEUE;
1439
1440   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1441      us is picky about finding the threat still in its queue when
1442      handling the deleteThread() */
1443
1444   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1445     next = t->link;
1446     if (t->id != tso->id) {
1447       deleteThread(t);
1448     }
1449   }
1450   }
1451   return pid;
1452 #else /* mingw32 */
1453   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1454   /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1455   return -1;
1456 #endif /* mingw32 */
1457 }
1458
1459 /* ---------------------------------------------------------------------------
1460  * deleteAllThreads():  kill all the live threads.
1461  *
1462  * This is used when we catch a user interrupt (^C), before performing
1463  * any necessary cleanups and running finalizers.
1464  *
1465  * Locks: sched_mutex held.
1466  * ------------------------------------------------------------------------- */
1467    
1468 void deleteAllThreads ( void )
1469 {
1470   StgTSO* t, *next;
1471   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1472   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1473       next = t->global_link;
1474       deleteThread(t);
1475   }      
1476   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1477   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1478   sleeping_queue = END_TSO_QUEUE;
1479 }
1480
1481 /* startThread and  insertThread are now in GranSim.c -- HWL */
1482
1483
1484 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1485 //@subsection Suspend and Resume
1486
1487 /* ---------------------------------------------------------------------------
1488  * Suspending & resuming Haskell threads.
1489  * 
1490  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1491  * its capability before calling the C function.  This allows another
1492  * task to pick up the capability and carry on running Haskell
1493  * threads.  It also means that if the C call blocks, it won't lock
1494  * the whole system.
1495  *
1496  * The Haskell thread making the C call is put to sleep for the
1497  * duration of the call, on the susepended_ccalling_threads queue.  We
1498  * give out a token to the task, which it can use to resume the thread
1499  * on return from the C function.
1500  * ------------------------------------------------------------------------- */
1501    
1502 StgInt
1503 suspendThread( StgRegTable *reg, 
1504                rtsBool concCall
1505 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1506                STG_UNUSED
1507 #endif
1508                )
1509 {
1510   nat tok;
1511   Capability *cap;
1512
1513   /* assume that *reg is a pointer to the StgRegTable part
1514    * of a Capability.
1515    */
1516   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1517
1518   ACQUIRE_LOCK(&sched_mutex);
1519
1520   IF_DEBUG(scheduler,
1521            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1522
1523   threadPaused(cap->r.rCurrentTSO);
1524   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1525   suspended_ccalling_threads = cap->r.rCurrentTSO;
1526
1527 #if defined(RTS_SUPPORTS_THREADS)
1528   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1529 #endif
1530
1531   /* Use the thread ID as the token; it should be unique */
1532   tok = cap->r.rCurrentTSO->id;
1533
1534   /* Hand back capability */
1535   releaseCapability(cap);
1536   
1537 #if defined(RTS_SUPPORTS_THREADS)
1538   /* Preparing to leave the RTS, so ensure there's a native thread/task
1539      waiting to take over.
1540      
1541      ToDo: optimise this and only create a new task if there's a need
1542      for one (i.e., if there's only one Concurrent Haskell thread alive,
1543      there's no need to create a new task).
1544   */
1545   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1546   if (concCall) {
1547     startTask(taskStart);
1548   }
1549 #endif
1550
1551   /* Other threads _might_ be available for execution; signal this */
1552   THREAD_RUNNABLE();
1553   RELEASE_LOCK(&sched_mutex);
1554   return tok; 
1555 }
1556
1557 StgRegTable *
1558 resumeThread( StgInt tok,
1559               rtsBool concCall
1560 #if !defined(RTS_SUPPORTS_THREADS)
1561                STG_UNUSED
1562 #endif
1563               )
1564 {
1565   StgTSO *tso, **prev;
1566   Capability *cap;
1567
1568 #if defined(RTS_SUPPORTS_THREADS)
1569   /* Wait for permission to re-enter the RTS with the result. */
1570   if ( concCall ) {
1571     ACQUIRE_LOCK(&sched_mutex);
1572     grabReturnCapability(&sched_mutex, &cap);
1573   } else {
1574     grabCapability(&cap);
1575   }
1576 #else
1577   grabCapability(&cap);
1578 #endif
1579
1580   /* Remove the thread off of the suspended list */
1581   prev = &suspended_ccalling_threads;
1582   for (tso = suspended_ccalling_threads; 
1583        tso != END_TSO_QUEUE; 
1584        prev = &tso->link, tso = tso->link) {
1585     if (tso->id == (StgThreadID)tok) {
1586       *prev = tso->link;
1587       break;
1588     }
1589   }
1590   if (tso == END_TSO_QUEUE) {
1591     barf("resumeThread: thread not found");
1592   }
1593   tso->link = END_TSO_QUEUE;
1594   /* Reset blocking status */
1595   tso->why_blocked  = NotBlocked;
1596
1597   cap->r.rCurrentTSO = tso;
1598   RELEASE_LOCK(&sched_mutex);
1599   return &cap->r;
1600 }
1601
1602
1603 /* ---------------------------------------------------------------------------
1604  * Static functions
1605  * ------------------------------------------------------------------------ */
1606 static void unblockThread(StgTSO *tso);
1607
1608 /* ---------------------------------------------------------------------------
1609  * Comparing Thread ids.
1610  *
1611  * This is used from STG land in the implementation of the
1612  * instances of Eq/Ord for ThreadIds.
1613  * ------------------------------------------------------------------------ */
1614
1615 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1616
1617   StgThreadID id1 = tso1->id; 
1618   StgThreadID id2 = tso2->id;
1619  
1620   if (id1 < id2) return (-1);
1621   if (id1 > id2) return 1;
1622   return 0;
1623 }
1624
1625 /* ---------------------------------------------------------------------------
1626  * Fetching the ThreadID from an StgTSO.
1627  *
1628  * This is used in the implementation of Show for ThreadIds.
1629  * ------------------------------------------------------------------------ */
1630 int rts_getThreadId(const StgTSO *tso) 
1631 {
1632   return tso->id;
1633 }
1634
1635 #ifdef DEBUG
1636 void labelThread(StgTSO *tso, char *label)
1637 {
1638   int len;
1639   void *buf;
1640
1641   /* Caveat: Once set, you can only set the thread name to "" */
1642   len = strlen(label)+1;
1643   buf = realloc(tso->label,len);
1644   if (buf == NULL) {
1645     fprintf(stderr,"insufficient memory for labelThread!\n");
1646     free(tso->label);
1647     tso->label = NULL;
1648   } else
1649     strncpy(buf,label,len);
1650   tso->label = buf;
1651 }
1652 #endif /* DEBUG */
1653
1654 /* ---------------------------------------------------------------------------
1655    Create a new thread.
1656
1657    The new thread starts with the given stack size.  Before the
1658    scheduler can run, however, this thread needs to have a closure
1659    (and possibly some arguments) pushed on its stack.  See
1660    pushClosure() in Schedule.h.
1661
1662    createGenThread() and createIOThread() (in SchedAPI.h) are
1663    convenient packaged versions of this function.
1664
1665    currently pri (priority) is only used in a GRAN setup -- HWL
1666    ------------------------------------------------------------------------ */
1667 //@cindex createThread
1668 #if defined(GRAN)
1669 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1670 StgTSO *
1671 createThread(nat stack_size, StgInt pri)
1672 {
1673   return createThread_(stack_size, rtsFalse, pri);
1674 }
1675
1676 static StgTSO *
1677 createThread_(nat size, rtsBool have_lock, StgInt pri)
1678 {
1679 #else
1680 StgTSO *
1681 createThread(nat stack_size)
1682 {
1683   return createThread_(stack_size, rtsFalse);
1684 }
1685
1686 static StgTSO *
1687 createThread_(nat size, rtsBool have_lock)
1688 {
1689 #endif
1690
1691     StgTSO *tso;
1692     nat stack_size;
1693
1694     /* First check whether we should create a thread at all */
1695 #if defined(PAR)
1696   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1697   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1698     threadsIgnored++;
1699     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1700           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1701     return END_TSO_QUEUE;
1702   }
1703   threadsCreated++;
1704 #endif
1705
1706 #if defined(GRAN)
1707   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1708 #endif
1709
1710   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1711
1712   /* catch ridiculously small stack sizes */
1713   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1714     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1715   }
1716
1717   stack_size = size - TSO_STRUCT_SIZEW;
1718
1719   tso = (StgTSO *)allocate(size);
1720   TICK_ALLOC_TSO(stack_size, 0);
1721
1722   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1723 #if defined(GRAN)
1724   SET_GRAN_HDR(tso, ThisPE);
1725 #endif
1726   tso->what_next     = ThreadEnterGHC;
1727
1728 #ifdef DEBUG
1729   tso->label = NULL;
1730 #endif
1731
1732   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1733    * protect the increment operation on next_thread_id.
1734    * In future, we could use an atomic increment instead.
1735    */
1736 #ifdef SMP
1737   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1738 #endif
1739   tso->id = next_thread_id++; 
1740 #ifdef SMP
1741   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1742 #endif
1743
1744   tso->why_blocked  = NotBlocked;
1745   tso->blocked_exceptions = NULL;
1746
1747   tso->stack_size   = stack_size;
1748   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1749                               - TSO_STRUCT_SIZEW;
1750   tso->sp           = (P_)&(tso->stack) + stack_size;
1751
1752 #ifdef PROFILING
1753   tso->prof.CCCS = CCS_MAIN;
1754 #endif
1755
1756   /* put a stop frame on the stack */
1757   tso->sp -= sizeofW(StgStopFrame);
1758   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1759   tso->su = (StgUpdateFrame*)tso->sp;
1760
1761   // ToDo: check this
1762 #if defined(GRAN)
1763   tso->link = END_TSO_QUEUE;
1764   /* uses more flexible routine in GranSim */
1765   insertThread(tso, CurrentProc);
1766 #else
1767   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1768    * from its creation
1769    */
1770 #endif
1771
1772 #if defined(GRAN) 
1773   if (RtsFlags.GranFlags.GranSimStats.Full) 
1774     DumpGranEvent(GR_START,tso);
1775 #elif defined(PAR)
1776   if (RtsFlags.ParFlags.ParStats.Full) 
1777     DumpGranEvent(GR_STARTQ,tso);
1778   /* HACk to avoid SCHEDULE 
1779      LastTSO = tso; */
1780 #endif
1781
1782   /* Link the new thread on the global thread list.
1783    */
1784   tso->global_link = all_threads;
1785   all_threads = tso;
1786
1787 #if defined(DIST)
1788   tso->dist.priority = MandatoryPriority; //by default that is...
1789 #endif
1790
1791 #if defined(GRAN)
1792   tso->gran.pri = pri;
1793 # if defined(DEBUG)
1794   tso->gran.magic = TSO_MAGIC; // debugging only
1795 # endif
1796   tso->gran.sparkname   = 0;
1797   tso->gran.startedat   = CURRENT_TIME; 
1798   tso->gran.exported    = 0;
1799   tso->gran.basicblocks = 0;
1800   tso->gran.allocs      = 0;
1801   tso->gran.exectime    = 0;
1802   tso->gran.fetchtime   = 0;
1803   tso->gran.fetchcount  = 0;
1804   tso->gran.blocktime   = 0;
1805   tso->gran.blockcount  = 0;
1806   tso->gran.blockedat   = 0;
1807   tso->gran.globalsparks = 0;
1808   tso->gran.localsparks  = 0;
1809   if (RtsFlags.GranFlags.Light)
1810     tso->gran.clock  = Now; /* local clock */
1811   else
1812     tso->gran.clock  = 0;
1813
1814   IF_DEBUG(gran,printTSO(tso));
1815 #elif defined(PAR)
1816 # if defined(DEBUG)
1817   tso->par.magic = TSO_MAGIC; // debugging only
1818 # endif
1819   tso->par.sparkname   = 0;
1820   tso->par.startedat   = CURRENT_TIME; 
1821   tso->par.exported    = 0;
1822   tso->par.basicblocks = 0;
1823   tso->par.allocs      = 0;
1824   tso->par.exectime    = 0;
1825   tso->par.fetchtime   = 0;
1826   tso->par.fetchcount  = 0;
1827   tso->par.blocktime   = 0;
1828   tso->par.blockcount  = 0;
1829   tso->par.blockedat   = 0;
1830   tso->par.globalsparks = 0;
1831   tso->par.localsparks  = 0;
1832 #endif
1833
1834 #if defined(GRAN)
1835   globalGranStats.tot_threads_created++;
1836   globalGranStats.threads_created_on_PE[CurrentProc]++;
1837   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1838   globalGranStats.tot_sq_probes++;
1839 #elif defined(PAR)
1840   // collect parallel global statistics (currently done together with GC stats)
1841   if (RtsFlags.ParFlags.ParStats.Global &&
1842       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1843     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1844     globalParStats.tot_threads_created++;
1845   }
1846 #endif 
1847
1848 #if defined(GRAN)
1849   IF_GRAN_DEBUG(pri,
1850                 belch("==__ schedule: Created TSO %d (%p);",
1851                       CurrentProc, tso, tso->id));
1852 #elif defined(PAR)
1853     IF_PAR_DEBUG(verbose,
1854                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1855                        tso->id, tso, advisory_thread_count));
1856 #else
1857   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1858                                  tso->id, tso->stack_size));
1859 #endif    
1860   return tso;
1861 }
1862
1863 #if defined(PAR)
1864 /* RFP:
1865    all parallel thread creation calls should fall through the following routine.
1866 */
1867 StgTSO *
1868 createSparkThread(rtsSpark spark) 
1869 { StgTSO *tso;
1870   ASSERT(spark != (rtsSpark)NULL);
1871   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1872   { threadsIgnored++;
1873     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1874           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1875     return END_TSO_QUEUE;
1876   }
1877   else
1878   { threadsCreated++;
1879     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1880     if (tso==END_TSO_QUEUE)     
1881       barf("createSparkThread: Cannot create TSO");
1882 #if defined(DIST)
1883     tso->priority = AdvisoryPriority;
1884 #endif
1885     pushClosure(tso,spark);
1886     PUSH_ON_RUN_QUEUE(tso);
1887     advisory_thread_count++;    
1888   }
1889   return tso;
1890 }
1891 #endif
1892
1893 /*
1894   Turn a spark into a thread.
1895   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1896 */
1897 #if defined(PAR)
1898 //@cindex activateSpark
1899 StgTSO *
1900 activateSpark (rtsSpark spark) 
1901 {
1902   StgTSO *tso;
1903
1904   tso = createSparkThread(spark);
1905   if (RtsFlags.ParFlags.ParStats.Full) {   
1906     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1907     IF_PAR_DEBUG(verbose,
1908                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1909                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1910   }
1911   // ToDo: fwd info on local/global spark to thread -- HWL
1912   // tso->gran.exported =  spark->exported;
1913   // tso->gran.locked =   !spark->global;
1914   // tso->gran.sparkname = spark->name;
1915
1916   return tso;
1917 }
1918 #endif
1919
1920 /* ---------------------------------------------------------------------------
1921  * scheduleThread()
1922  *
1923  * scheduleThread puts a thread on the head of the runnable queue.
1924  * This will usually be done immediately after a thread is created.
1925  * The caller of scheduleThread must create the thread using e.g.
1926  * createThread and push an appropriate closure
1927  * on this thread's stack before the scheduler is invoked.
1928  * ------------------------------------------------------------------------ */
1929
1930 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1931
1932 void
1933 scheduleThread_(StgTSO *tso
1934                , rtsBool createTask
1935 #if !defined(THREADED_RTS)
1936                  STG_UNUSED
1937 #endif
1938               )
1939 {
1940   ACQUIRE_LOCK(&sched_mutex);
1941
1942   /* Put the new thread on the head of the runnable queue.  The caller
1943    * better push an appropriate closure on this thread's stack
1944    * beforehand.  In the SMP case, the thread may start running as
1945    * soon as we release the scheduler lock below.
1946    */
1947   PUSH_ON_RUN_QUEUE(tso);
1948 #if defined(THREADED_RTS)
1949   /* If main() is scheduling a thread, don't bother creating a 
1950    * new task.
1951    */
1952   if ( createTask ) {
1953     startTask(taskStart);
1954   }
1955 #endif
1956   THREAD_RUNNABLE();
1957
1958 #if 0
1959   IF_DEBUG(scheduler,printTSO(tso));
1960 #endif
1961   RELEASE_LOCK(&sched_mutex);
1962 }
1963
1964 void scheduleThread(StgTSO* tso)
1965 {
1966   return scheduleThread_(tso, rtsFalse);
1967 }
1968
1969 void scheduleExtThread(StgTSO* tso)
1970 {
1971   return scheduleThread_(tso, rtsTrue);
1972 }
1973
1974 /* ---------------------------------------------------------------------------
1975  * initScheduler()
1976  *
1977  * Initialise the scheduler.  This resets all the queues - if the
1978  * queues contained any threads, they'll be garbage collected at the
1979  * next pass.
1980  *
1981  * ------------------------------------------------------------------------ */
1982
1983 #ifdef SMP
1984 static void
1985 term_handler(int sig STG_UNUSED)
1986 {
1987   stat_workerStop();
1988   ACQUIRE_LOCK(&term_mutex);
1989   await_death--;
1990   RELEASE_LOCK(&term_mutex);
1991   shutdownThread();
1992 }
1993 #endif
1994
1995 void 
1996 initScheduler(void)
1997 {
1998 #if defined(GRAN)
1999   nat i;
2000
2001   for (i=0; i<=MAX_PROC; i++) {
2002     run_queue_hds[i]      = END_TSO_QUEUE;
2003     run_queue_tls[i]      = END_TSO_QUEUE;
2004     blocked_queue_hds[i]  = END_TSO_QUEUE;
2005     blocked_queue_tls[i]  = END_TSO_QUEUE;
2006     ccalling_threadss[i]  = END_TSO_QUEUE;
2007     sleeping_queue        = END_TSO_QUEUE;
2008   }
2009 #else
2010   run_queue_hd      = END_TSO_QUEUE;
2011   run_queue_tl      = END_TSO_QUEUE;
2012   blocked_queue_hd  = END_TSO_QUEUE;
2013   blocked_queue_tl  = END_TSO_QUEUE;
2014   sleeping_queue    = END_TSO_QUEUE;
2015 #endif 
2016
2017   suspended_ccalling_threads  = END_TSO_QUEUE;
2018
2019   main_threads = NULL;
2020   all_threads  = END_TSO_QUEUE;
2021
2022   context_switch = 0;
2023   interrupted    = 0;
2024
2025   RtsFlags.ConcFlags.ctxtSwitchTicks =
2026       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2027       
2028 #if defined(RTS_SUPPORTS_THREADS)
2029   /* Initialise the mutex and condition variables used by
2030    * the scheduler. */
2031   initMutex(&sched_mutex);
2032   initMutex(&term_mutex);
2033
2034   initCondition(&thread_ready_cond);
2035 #endif
2036   
2037 #if defined(SMP)
2038   initCondition(&gc_pending_cond);
2039 #endif
2040
2041 #if defined(RTS_SUPPORTS_THREADS)
2042   ACQUIRE_LOCK(&sched_mutex);
2043 #endif
2044
2045   /* Install the SIGHUP handler */
2046 #if defined(SMP)
2047   {
2048     struct sigaction action,oact;
2049
2050     action.sa_handler = term_handler;
2051     sigemptyset(&action.sa_mask);
2052     action.sa_flags = 0;
2053     if (sigaction(SIGTERM, &action, &oact) != 0) {
2054       barf("can't install TERM handler");
2055     }
2056   }
2057 #endif
2058
2059   /* A capability holds the state a native thread needs in
2060    * order to execute STG code. At least one capability is
2061    * floating around (only SMP builds have more than one).
2062    */
2063   initCapabilities();
2064   
2065 #if defined(RTS_SUPPORTS_THREADS)
2066     /* start our haskell execution tasks */
2067 # if defined(SMP)
2068     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2069 # else
2070     startTaskManager(0,taskStart);
2071 # endif
2072 #endif
2073
2074 #if /* defined(SMP) ||*/ defined(PAR)
2075   initSparkPools();
2076 #endif
2077
2078 #if defined(RTS_SUPPORTS_THREADS)
2079   RELEASE_LOCK(&sched_mutex);
2080 #endif
2081
2082 }
2083
2084 void
2085 exitScheduler( void )
2086 {
2087 #if defined(RTS_SUPPORTS_THREADS)
2088   stopTaskManager();
2089 #endif
2090   shutting_down_scheduler = rtsTrue;
2091 }
2092
2093 /* -----------------------------------------------------------------------------
2094    Managing the per-task allocation areas.
2095    
2096    Each capability comes with an allocation area.  These are
2097    fixed-length block lists into which allocation can be done.
2098
2099    ToDo: no support for two-space collection at the moment???
2100    -------------------------------------------------------------------------- */
2101
2102 /* -----------------------------------------------------------------------------
2103  * waitThread is the external interface for running a new computation
2104  * and waiting for the result.
2105  *
2106  * In the non-SMP case, we create a new main thread, push it on the 
2107  * main-thread stack, and invoke the scheduler to run it.  The
2108  * scheduler will return when the top main thread on the stack has
2109  * completed or died, and fill in the necessary fields of the
2110  * main_thread structure.
2111  *
2112  * In the SMP case, we create a main thread as before, but we then
2113  * create a new condition variable and sleep on it.  When our new
2114  * main thread has completed, we'll be woken up and the status/result
2115  * will be in the main_thread struct.
2116  * -------------------------------------------------------------------------- */
2117
2118 int 
2119 howManyThreadsAvail ( void )
2120 {
2121    int i = 0;
2122    StgTSO* q;
2123    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2124       i++;
2125    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2126       i++;
2127    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2128       i++;
2129    return i;
2130 }
2131
2132 void
2133 finishAllThreads ( void )
2134 {
2135    do {
2136       while (run_queue_hd != END_TSO_QUEUE) {
2137          waitThread ( run_queue_hd, NULL);
2138       }
2139       while (blocked_queue_hd != END_TSO_QUEUE) {
2140          waitThread ( blocked_queue_hd, NULL);
2141       }
2142       while (sleeping_queue != END_TSO_QUEUE) {
2143          waitThread ( blocked_queue_hd, NULL);
2144       }
2145    } while 
2146       (blocked_queue_hd != END_TSO_QUEUE || 
2147        run_queue_hd     != END_TSO_QUEUE ||
2148        sleeping_queue   != END_TSO_QUEUE);
2149 }
2150
2151 SchedulerStatus
2152 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2153
2154   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2155 #if defined(THREADED_RTS)
2156   return waitThread_(tso,ret, rtsFalse);
2157 #else
2158   return waitThread_(tso,ret);
2159 #endif
2160 }
2161
2162 SchedulerStatus
2163 waitThread_(StgTSO *tso,
2164             /*out*/StgClosure **ret
2165 #if defined(THREADED_RTS)
2166             , rtsBool blockWaiting
2167 #endif
2168            )
2169 {
2170   StgMainThread *m;
2171   SchedulerStatus stat;
2172
2173   ACQUIRE_LOCK(&sched_mutex);
2174   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2175   
2176   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2177
2178   m->tso = tso;
2179   m->ret = ret;
2180   m->stat = NoStatus;
2181 #if defined(RTS_SUPPORTS_THREADS)
2182   initCondition(&m->wakeup);
2183 #endif
2184
2185   m->link = main_threads;
2186   main_threads = m;
2187
2188   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2189
2190 #if defined(RTS_SUPPORTS_THREADS)
2191
2192 # if defined(THREADED_RTS)
2193   if (!blockWaiting) {
2194     /* In the threaded case, the OS thread that called main()
2195      * gets to enter the RTS directly without going via another
2196      * task/thread.
2197      */
2198     RELEASE_LOCK(&sched_mutex);
2199     schedule();
2200     ASSERT(m->stat != NoStatus);
2201   } else 
2202 # endif
2203   {
2204     do {
2205       waitCondition(&m->wakeup, &sched_mutex);
2206     } while (m->stat == NoStatus);
2207   }
2208 #elif defined(GRAN)
2209   /* GranSim specific init */
2210   CurrentTSO = m->tso;                // the TSO to run
2211   procStatus[MainProc] = Busy;        // status of main PE
2212   CurrentProc = MainProc;             // PE to run it on
2213
2214   schedule();
2215 #else
2216   RELEASE_LOCK(&sched_mutex);
2217   schedule();
2218   ASSERT(m->stat != NoStatus);
2219 #endif
2220
2221   stat = m->stat;
2222
2223 #if defined(RTS_SUPPORTS_THREADS)
2224   closeCondition(&m->wakeup);
2225 #endif
2226
2227   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2228                               m->tso->id));
2229   free(m);
2230
2231 #if defined(THREADED_RTS)
2232   if (blockWaiting) 
2233 #endif
2234     RELEASE_LOCK(&sched_mutex);
2235
2236   return stat;
2237 }
2238
2239 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2240 //@subsection Run queue code 
2241
2242 #if 0
2243 /* 
2244    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2245        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2246        implicit global variable that has to be correct when calling these
2247        fcts -- HWL 
2248 */
2249
2250 /* Put the new thread on the head of the runnable queue.
2251  * The caller of createThread better push an appropriate closure
2252  * on this thread's stack before the scheduler is invoked.
2253  */
2254 static /* inline */ void
2255 add_to_run_queue(tso)
2256 StgTSO* tso; 
2257 {
2258   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2259   tso->link = run_queue_hd;
2260   run_queue_hd = tso;
2261   if (run_queue_tl == END_TSO_QUEUE) {
2262     run_queue_tl = tso;
2263   }
2264 }
2265
2266 /* Put the new thread at the end of the runnable queue. */
2267 static /* inline */ void
2268 push_on_run_queue(tso)
2269 StgTSO* tso; 
2270 {
2271   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2272   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2273   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2274   if (run_queue_hd == END_TSO_QUEUE) {
2275     run_queue_hd = tso;
2276   } else {
2277     run_queue_tl->link = tso;
2278   }
2279   run_queue_tl = tso;
2280 }
2281
2282 /* 
2283    Should be inlined because it's used very often in schedule.  The tso
2284    argument is actually only needed in GranSim, where we want to have the
2285    possibility to schedule *any* TSO on the run queue, irrespective of the
2286    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2287    the run queue and dequeue the tso, adjusting the links in the queue. 
2288 */
2289 //@cindex take_off_run_queue
2290 static /* inline */ StgTSO*
2291 take_off_run_queue(StgTSO *tso) {
2292   StgTSO *t, *prev;
2293
2294   /* 
2295      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2296
2297      if tso is specified, unlink that tso from the run_queue (doesn't have
2298      to be at the beginning of the queue); GranSim only 
2299   */
2300   if (tso!=END_TSO_QUEUE) {
2301     /* find tso in queue */
2302     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2303          t!=END_TSO_QUEUE && t!=tso;
2304          prev=t, t=t->link) 
2305       /* nothing */ ;
2306     ASSERT(t==tso);
2307     /* now actually dequeue the tso */
2308     if (prev!=END_TSO_QUEUE) {
2309       ASSERT(run_queue_hd!=t);
2310       prev->link = t->link;
2311     } else {
2312       /* t is at beginning of thread queue */
2313       ASSERT(run_queue_hd==t);
2314       run_queue_hd = t->link;
2315     }
2316     /* t is at end of thread queue */
2317     if (t->link==END_TSO_QUEUE) {
2318       ASSERT(t==run_queue_tl);
2319       run_queue_tl = prev;
2320     } else {
2321       ASSERT(run_queue_tl!=t);
2322     }
2323     t->link = END_TSO_QUEUE;
2324   } else {
2325     /* take tso from the beginning of the queue; std concurrent code */
2326     t = run_queue_hd;
2327     if (t != END_TSO_QUEUE) {
2328       run_queue_hd = t->link;
2329       t->link = END_TSO_QUEUE;
2330       if (run_queue_hd == END_TSO_QUEUE) {
2331         run_queue_tl = END_TSO_QUEUE;
2332       }
2333     }
2334   }
2335   return t;
2336 }
2337
2338 #endif /* 0 */
2339
2340 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2341 //@subsection Garbage Collextion Routines
2342
2343 /* ---------------------------------------------------------------------------
2344    Where are the roots that we know about?
2345
2346         - all the threads on the runnable queue
2347         - all the threads on the blocked queue
2348         - all the threads on the sleeping queue
2349         - all the thread currently executing a _ccall_GC
2350         - all the "main threads"
2351      
2352    ------------------------------------------------------------------------ */
2353
2354 /* This has to be protected either by the scheduler monitor, or by the
2355         garbage collection monitor (probably the latter).
2356         KH @ 25/10/99
2357 */
2358
2359 void
2360 GetRoots(evac_fn evac)
2361 {
2362 #if defined(GRAN)
2363   {
2364     nat i;
2365     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2366       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2367           evac((StgClosure **)&run_queue_hds[i]);
2368       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2369           evac((StgClosure **)&run_queue_tls[i]);
2370       
2371       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2372           evac((StgClosure **)&blocked_queue_hds[i]);
2373       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2374           evac((StgClosure **)&blocked_queue_tls[i]);
2375       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2376           evac((StgClosure **)&ccalling_threads[i]);
2377     }
2378   }
2379
2380   markEventQueue();
2381
2382 #else /* !GRAN */
2383   if (run_queue_hd != END_TSO_QUEUE) {
2384       ASSERT(run_queue_tl != END_TSO_QUEUE);
2385       evac((StgClosure **)&run_queue_hd);
2386       evac((StgClosure **)&run_queue_tl);
2387   }
2388   
2389   if (blocked_queue_hd != END_TSO_QUEUE) {
2390       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2391       evac((StgClosure **)&blocked_queue_hd);
2392       evac((StgClosure **)&blocked_queue_tl);
2393   }
2394   
2395   if (sleeping_queue != END_TSO_QUEUE) {
2396       evac((StgClosure **)&sleeping_queue);
2397   }
2398 #endif 
2399
2400   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2401       evac((StgClosure **)&suspended_ccalling_threads);
2402   }
2403
2404 #if defined(PAR) || defined(GRAN)
2405   markSparkQueue(evac);
2406 #endif
2407 }
2408
2409 /* -----------------------------------------------------------------------------
2410    performGC
2411
2412    This is the interface to the garbage collector from Haskell land.
2413    We provide this so that external C code can allocate and garbage
2414    collect when called from Haskell via _ccall_GC.
2415
2416    It might be useful to provide an interface whereby the programmer
2417    can specify more roots (ToDo).
2418    
2419    This needs to be protected by the GC condition variable above.  KH.
2420    -------------------------------------------------------------------------- */
2421
2422 void (*extra_roots)(evac_fn);
2423
2424 void
2425 performGC(void)
2426 {
2427   /* Obligated to hold this lock upon entry */
2428   ACQUIRE_LOCK(&sched_mutex);
2429   GarbageCollect(GetRoots,rtsFalse);
2430   RELEASE_LOCK(&sched_mutex);
2431 }
2432
2433 void
2434 performMajorGC(void)
2435 {
2436   ACQUIRE_LOCK(&sched_mutex);
2437   GarbageCollect(GetRoots,rtsTrue);
2438   RELEASE_LOCK(&sched_mutex);
2439 }
2440
2441 static void
2442 AllRoots(evac_fn evac)
2443 {
2444     GetRoots(evac);             // the scheduler's roots
2445     extra_roots(evac);          // the user's roots
2446 }
2447
2448 void
2449 performGCWithRoots(void (*get_roots)(evac_fn))
2450 {
2451   ACQUIRE_LOCK(&sched_mutex);
2452   extra_roots = get_roots;
2453   GarbageCollect(AllRoots,rtsFalse);
2454   RELEASE_LOCK(&sched_mutex);
2455 }
2456
2457 /* -----------------------------------------------------------------------------
2458    Stack overflow
2459
2460    If the thread has reached its maximum stack size, then raise the
2461    StackOverflow exception in the offending thread.  Otherwise
2462    relocate the TSO into a larger chunk of memory and adjust its stack
2463    size appropriately.
2464    -------------------------------------------------------------------------- */
2465
2466 static StgTSO *
2467 threadStackOverflow(StgTSO *tso)
2468 {
2469   nat new_stack_size, new_tso_size, diff, stack_words;
2470   StgPtr new_sp;
2471   StgTSO *dest;
2472
2473   IF_DEBUG(sanity,checkTSO(tso));
2474   if (tso->stack_size >= tso->max_stack_size) {
2475
2476     IF_DEBUG(gc,
2477              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2478                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2479              /* If we're debugging, just print out the top of the stack */
2480              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2481                                               tso->sp+64)));
2482
2483     /* Send this thread the StackOverflow exception */
2484     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2485     return tso;
2486   }
2487
2488   /* Try to double the current stack size.  If that takes us over the
2489    * maximum stack size for this thread, then use the maximum instead.
2490    * Finally round up so the TSO ends up as a whole number of blocks.
2491    */
2492   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2493   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2494                                        TSO_STRUCT_SIZE)/sizeof(W_);
2495   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2496   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2497
2498   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2499
2500   dest = (StgTSO *)allocate(new_tso_size);
2501   TICK_ALLOC_TSO(new_stack_size,0);
2502
2503   /* copy the TSO block and the old stack into the new area */
2504   memcpy(dest,tso,TSO_STRUCT_SIZE);
2505   stack_words = tso->stack + tso->stack_size - tso->sp;
2506   new_sp = (P_)dest + new_tso_size - stack_words;
2507   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2508
2509   /* relocate the stack pointers... */
2510   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2511   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2512   dest->sp    = new_sp;
2513   dest->stack_size = new_stack_size;
2514         
2515   /* and relocate the update frame list */
2516   relocate_stack(dest, diff);
2517
2518   /* Mark the old TSO as relocated.  We have to check for relocated
2519    * TSOs in the garbage collector and any primops that deal with TSOs.
2520    *
2521    * It's important to set the sp and su values to just beyond the end
2522    * of the stack, so we don't attempt to scavenge any part of the
2523    * dead TSO's stack.
2524    */
2525   tso->what_next = ThreadRelocated;
2526   tso->link = dest;
2527   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2528   tso->su = (StgUpdateFrame *)tso->sp;
2529   tso->why_blocked = NotBlocked;
2530   dest->mut_link = NULL;
2531
2532   IF_PAR_DEBUG(verbose,
2533                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2534                      tso->id, tso, tso->stack_size);
2535                /* If we're debugging, just print out the top of the stack */
2536                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2537                                                 tso->sp+64)));
2538   
2539   IF_DEBUG(sanity,checkTSO(tso));
2540 #if 0
2541   IF_DEBUG(scheduler,printTSO(dest));
2542 #endif
2543
2544   return dest;
2545 }
2546
2547 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2548 //@subsection Blocking Queue Routines
2549
2550 /* ---------------------------------------------------------------------------
2551    Wake up a queue that was blocked on some resource.
2552    ------------------------------------------------------------------------ */
2553
2554 #if defined(GRAN)
2555 static inline void
2556 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2557 {
2558 }
2559 #elif defined(PAR)
2560 static inline void
2561 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2562 {
2563   /* write RESUME events to log file and
2564      update blocked and fetch time (depending on type of the orig closure) */
2565   if (RtsFlags.ParFlags.ParStats.Full) {
2566     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2567                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2568                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2569     if (EMPTY_RUN_QUEUE())
2570       emitSchedule = rtsTrue;
2571
2572     switch (get_itbl(node)->type) {
2573         case FETCH_ME_BQ:
2574           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2575           break;
2576         case RBH:
2577         case FETCH_ME:
2578         case BLACKHOLE_BQ:
2579           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2580           break;
2581 #ifdef DIST
2582         case MVAR:
2583           break;
2584 #endif    
2585         default:
2586           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2587         }
2588       }
2589 }
2590 #endif
2591
2592 #if defined(GRAN)
2593 static StgBlockingQueueElement *
2594 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2595 {
2596     StgTSO *tso;
2597     PEs node_loc, tso_loc;
2598
2599     node_loc = where_is(node); // should be lifted out of loop
2600     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2601     tso_loc = where_is((StgClosure *)tso);
2602     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2603       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2604       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2605       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2606       // insertThread(tso, node_loc);
2607       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2608                 ResumeThread,
2609                 tso, node, (rtsSpark*)NULL);
2610       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2611       // len_local++;
2612       // len++;
2613     } else { // TSO is remote (actually should be FMBQ)
2614       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2615                                   RtsFlags.GranFlags.Costs.gunblocktime +
2616                                   RtsFlags.GranFlags.Costs.latency;
2617       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2618                 UnblockThread,
2619                 tso, node, (rtsSpark*)NULL);
2620       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2621       // len++;
2622     }
2623     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2624     IF_GRAN_DEBUG(bq,
2625                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2626                           (node_loc==tso_loc ? "Local" : "Global"), 
2627                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2628     tso->block_info.closure = NULL;
2629     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2630                              tso->id, tso));
2631 }
2632 #elif defined(PAR)
2633 static StgBlockingQueueElement *
2634 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2635 {
2636     StgBlockingQueueElement *next;
2637
2638     switch (get_itbl(bqe)->type) {
2639     case TSO:
2640       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2641       /* if it's a TSO just push it onto the run_queue */
2642       next = bqe->link;
2643       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2644       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2645       THREAD_RUNNABLE();
2646       unblockCount(bqe, node);
2647       /* reset blocking status after dumping event */
2648       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2649       break;
2650
2651     case BLOCKED_FETCH:
2652       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2653       next = bqe->link;
2654       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2655       PendingFetches = (StgBlockedFetch *)bqe;
2656       break;
2657
2658 # if defined(DEBUG)
2659       /* can ignore this case in a non-debugging setup; 
2660          see comments on RBHSave closures above */
2661     case CONSTR:
2662       /* check that the closure is an RBHSave closure */
2663       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2664              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2665              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2666       break;
2667
2668     default:
2669       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2670            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2671            (StgClosure *)bqe);
2672 # endif
2673     }
2674   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2675   return next;
2676 }
2677
2678 #else /* !GRAN && !PAR */
2679 static StgTSO *
2680 unblockOneLocked(StgTSO *tso)
2681 {
2682   StgTSO *next;
2683
2684   ASSERT(get_itbl(tso)->type == TSO);
2685   ASSERT(tso->why_blocked != NotBlocked);
2686   tso->why_blocked = NotBlocked;
2687   next = tso->link;
2688   PUSH_ON_RUN_QUEUE(tso);
2689   THREAD_RUNNABLE();
2690   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2691   return next;
2692 }
2693 #endif
2694
2695 #if defined(GRAN) || defined(PAR)
2696 inline StgBlockingQueueElement *
2697 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2698 {
2699   ACQUIRE_LOCK(&sched_mutex);
2700   bqe = unblockOneLocked(bqe, node);
2701   RELEASE_LOCK(&sched_mutex);
2702   return bqe;
2703 }
2704 #else
2705 inline StgTSO *
2706 unblockOne(StgTSO *tso)
2707 {
2708   ACQUIRE_LOCK(&sched_mutex);
2709   tso = unblockOneLocked(tso);
2710   RELEASE_LOCK(&sched_mutex);
2711   return tso;
2712 }
2713 #endif
2714
2715 #if defined(GRAN)
2716 void 
2717 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2718 {
2719   StgBlockingQueueElement *bqe;
2720   PEs node_loc;
2721   nat len = 0; 
2722
2723   IF_GRAN_DEBUG(bq, 
2724                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2725                       node, CurrentProc, CurrentTime[CurrentProc], 
2726                       CurrentTSO->id, CurrentTSO));
2727
2728   node_loc = where_is(node);
2729
2730   ASSERT(q == END_BQ_QUEUE ||
2731          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2732          get_itbl(q)->type == CONSTR); // closure (type constructor)
2733   ASSERT(is_unique(node));
2734
2735   /* FAKE FETCH: magically copy the node to the tso's proc;
2736      no Fetch necessary because in reality the node should not have been 
2737      moved to the other PE in the first place
2738   */
2739   if (CurrentProc!=node_loc) {
2740     IF_GRAN_DEBUG(bq, 
2741                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2742                         node, node_loc, CurrentProc, CurrentTSO->id, 
2743                         // CurrentTSO, where_is(CurrentTSO),
2744                         node->header.gran.procs));
2745     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2746     IF_GRAN_DEBUG(bq, 
2747                   belch("## new bitmask of node %p is %#x",
2748                         node, node->header.gran.procs));
2749     if (RtsFlags.GranFlags.GranSimStats.Global) {
2750       globalGranStats.tot_fake_fetches++;
2751     }
2752   }
2753
2754   bqe = q;
2755   // ToDo: check: ASSERT(CurrentProc==node_loc);
2756   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2757     //next = bqe->link;
2758     /* 
2759        bqe points to the current element in the queue
2760        next points to the next element in the queue
2761     */
2762     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2763     //tso_loc = where_is(tso);
2764     len++;
2765     bqe = unblockOneLocked(bqe, node);
2766   }
2767
2768   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2769      the closure to make room for the anchor of the BQ */
2770   if (bqe!=END_BQ_QUEUE) {
2771     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2772     /*
2773     ASSERT((info_ptr==&RBH_Save_0_info) ||
2774            (info_ptr==&RBH_Save_1_info) ||
2775            (info_ptr==&RBH_Save_2_info));
2776     */
2777     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2778     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2779     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2780
2781     IF_GRAN_DEBUG(bq,
2782                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2783                         node, info_type(node)));
2784   }
2785
2786   /* statistics gathering */
2787   if (RtsFlags.GranFlags.GranSimStats.Global) {
2788     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2789     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2790     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2791     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2792   }
2793   IF_GRAN_DEBUG(bq,
2794                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2795                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2796 }
2797 #elif defined(PAR)
2798 void 
2799 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2800 {
2801   StgBlockingQueueElement *bqe;
2802
2803   ACQUIRE_LOCK(&sched_mutex);
2804
2805   IF_PAR_DEBUG(verbose, 
2806                belch("##-_ AwBQ for node %p on [%x]: ",
2807                      node, mytid));
2808 #ifdef DIST  
2809   //RFP
2810   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2811     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2812     return;
2813   }
2814 #endif
2815   
2816   ASSERT(q == END_BQ_QUEUE ||
2817          get_itbl(q)->type == TSO ||           
2818          get_itbl(q)->type == BLOCKED_FETCH || 
2819          get_itbl(q)->type == CONSTR); 
2820
2821   bqe = q;
2822   while (get_itbl(bqe)->type==TSO || 
2823          get_itbl(bqe)->type==BLOCKED_FETCH) {
2824     bqe = unblockOneLocked(bqe, node);
2825   }
2826   RELEASE_LOCK(&sched_mutex);
2827 }
2828
2829 #else   /* !GRAN && !PAR */
2830 void
2831 awakenBlockedQueue(StgTSO *tso)
2832 {
2833   ACQUIRE_LOCK(&sched_mutex);
2834   while (tso != END_TSO_QUEUE) {
2835     tso = unblockOneLocked(tso);
2836   }
2837   RELEASE_LOCK(&sched_mutex);
2838 }
2839 #endif
2840
2841 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2842 //@subsection Exception Handling Routines
2843
2844 /* ---------------------------------------------------------------------------
2845    Interrupt execution
2846    - usually called inside a signal handler so it mustn't do anything fancy.   
2847    ------------------------------------------------------------------------ */
2848
2849 void
2850 interruptStgRts(void)
2851 {
2852     interrupted    = 1;
2853     context_switch = 1;
2854 }
2855
2856 /* -----------------------------------------------------------------------------
2857    Unblock a thread
2858
2859    This is for use when we raise an exception in another thread, which
2860    may be blocked.
2861    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2862    -------------------------------------------------------------------------- */
2863
2864 #if defined(GRAN) || defined(PAR)
2865 /*
2866   NB: only the type of the blocking queue is different in GranSim and GUM
2867       the operations on the queue-elements are the same
2868       long live polymorphism!
2869
2870   Locks: sched_mutex is held upon entry and exit.
2871
2872 */
2873 static void
2874 unblockThread(StgTSO *tso)
2875 {
2876   StgBlockingQueueElement *t, **last;
2877
2878   switch (tso->why_blocked) {
2879
2880   case NotBlocked:
2881     return;  /* not blocked */
2882
2883   case BlockedOnMVar:
2884     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2885     {
2886       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2887       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2888
2889       last = (StgBlockingQueueElement **)&mvar->head;
2890       for (t = (StgBlockingQueueElement *)mvar->head; 
2891            t != END_BQ_QUEUE; 
2892            last = &t->link, last_tso = t, t = t->link) {
2893         if (t == (StgBlockingQueueElement *)tso) {
2894           *last = (StgBlockingQueueElement *)tso->link;
2895           if (mvar->tail == tso) {
2896             mvar->tail = (StgTSO *)last_tso;
2897           }
2898           goto done;
2899         }
2900       }
2901       barf("unblockThread (MVAR): TSO not found");
2902     }
2903
2904   case BlockedOnBlackHole:
2905     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2906     {
2907       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2908
2909       last = &bq->blocking_queue;
2910       for (t = bq->blocking_queue; 
2911            t != END_BQ_QUEUE; 
2912            last = &t->link, t = t->link) {
2913         if (t == (StgBlockingQueueElement *)tso) {
2914           *last = (StgBlockingQueueElement *)tso->link;
2915           goto done;
2916         }
2917       }
2918       barf("unblockThread (BLACKHOLE): TSO not found");
2919     }
2920
2921   case BlockedOnException:
2922     {
2923       StgTSO *target  = tso->block_info.tso;
2924
2925       ASSERT(get_itbl(target)->type == TSO);
2926
2927       if (target->what_next == ThreadRelocated) {
2928           target = target->link;
2929           ASSERT(get_itbl(target)->type == TSO);
2930       }
2931
2932       ASSERT(target->blocked_exceptions != NULL);
2933
2934       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2935       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2936            t != END_BQ_QUEUE; 
2937            last = &t->link, t = t->link) {
2938         ASSERT(get_itbl(t)->type == TSO);
2939         if (t == (StgBlockingQueueElement *)tso) {
2940           *last = (StgBlockingQueueElement *)tso->link;
2941           goto done;
2942         }
2943       }
2944       barf("unblockThread (Exception): TSO not found");
2945     }
2946
2947   case BlockedOnRead:
2948   case BlockedOnWrite:
2949     {
2950       /* take TSO off blocked_queue */
2951       StgBlockingQueueElement *prev = NULL;
2952       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2953            prev = t, t = t->link) {
2954         if (t == (StgBlockingQueueElement *)tso) {
2955           if (prev == NULL) {
2956             blocked_queue_hd = (StgTSO *)t->link;
2957             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2958               blocked_queue_tl = END_TSO_QUEUE;
2959             }
2960           } else {
2961             prev->link = t->link;
2962             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2963               blocked_queue_tl = (StgTSO *)prev;
2964             }
2965           }
2966           goto done;
2967         }
2968       }
2969       barf("unblockThread (I/O): TSO not found");
2970     }
2971
2972   case BlockedOnDelay:
2973     {
2974       /* take TSO off sleeping_queue */
2975       StgBlockingQueueElement *prev = NULL;
2976       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2977            prev = t, t = t->link) {
2978         if (t == (StgBlockingQueueElement *)tso) {
2979           if (prev == NULL) {
2980             sleeping_queue = (StgTSO *)t->link;
2981           } else {
2982             prev->link = t->link;
2983           }
2984           goto done;
2985         }
2986       }
2987       barf("unblockThread (I/O): TSO not found");
2988     }
2989
2990   default:
2991     barf("unblockThread");
2992   }
2993
2994  done:
2995   tso->link = END_TSO_QUEUE;
2996   tso->why_blocked = NotBlocked;
2997   tso->block_info.closure = NULL;
2998   PUSH_ON_RUN_QUEUE(tso);
2999 }
3000 #else
3001 static void
3002 unblockThread(StgTSO *tso)
3003 {
3004   StgTSO *t, **last;
3005   
3006   /* To avoid locking unnecessarily. */
3007   if (tso->why_blocked == NotBlocked) {
3008     return;
3009   }
3010
3011   switch (tso->why_blocked) {
3012
3013   case BlockedOnMVar:
3014     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3015     {
3016       StgTSO *last_tso = END_TSO_QUEUE;
3017       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3018
3019       last = &mvar->head;
3020       for (t = mvar->head; t != END_TSO_QUEUE; 
3021            last = &t->link, last_tso = t, t = t->link) {
3022         if (t == tso) {
3023           *last = tso->link;
3024           if (mvar->tail == tso) {
3025             mvar->tail = last_tso;
3026           }
3027           goto done;
3028         }
3029       }
3030       barf("unblockThread (MVAR): TSO not found");
3031     }
3032
3033   case BlockedOnBlackHole:
3034     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3035     {
3036       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3037
3038       last = &bq->blocking_queue;
3039       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3040            last = &t->link, t = t->link) {
3041         if (t == tso) {
3042           *last = tso->link;
3043           goto done;
3044         }
3045       }
3046       barf("unblockThread (BLACKHOLE): TSO not found");
3047     }
3048
3049   case BlockedOnException:
3050     {
3051       StgTSO *target  = tso->block_info.tso;
3052
3053       ASSERT(get_itbl(target)->type == TSO);
3054
3055       while (target->what_next == ThreadRelocated) {
3056           target = target->link;
3057           ASSERT(get_itbl(target)->type == TSO);
3058       }
3059       
3060       ASSERT(target->blocked_exceptions != NULL);
3061
3062       last = &target->blocked_exceptions;
3063       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3064            last = &t->link, t = t->link) {
3065         ASSERT(get_itbl(t)->type == TSO);
3066         if (t == tso) {
3067           *last = tso->link;
3068           goto done;
3069         }
3070       }
3071       barf("unblockThread (Exception): TSO not found");
3072     }
3073
3074   case BlockedOnRead:
3075   case BlockedOnWrite:
3076     {
3077       StgTSO *prev = NULL;
3078       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3079            prev = t, t = t->link) {
3080         if (t == tso) {
3081           if (prev == NULL) {
3082             blocked_queue_hd = t->link;
3083             if (blocked_queue_tl == t) {
3084               blocked_queue_tl = END_TSO_QUEUE;
3085             }
3086           } else {
3087             prev->link = t->link;
3088             if (blocked_queue_tl == t) {
3089               blocked_queue_tl = prev;
3090             }
3091           }
3092           goto done;
3093         }
3094       }
3095       barf("unblockThread (I/O): TSO not found");
3096     }
3097
3098   case BlockedOnDelay:
3099     {
3100       StgTSO *prev = NULL;
3101       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3102            prev = t, t = t->link) {
3103         if (t == tso) {
3104           if (prev == NULL) {
3105             sleeping_queue = t->link;
3106           } else {
3107             prev->link = t->link;
3108           }
3109           goto done;
3110         }
3111       }
3112       barf("unblockThread (I/O): TSO not found");
3113     }
3114
3115   default:
3116     barf("unblockThread");
3117   }
3118
3119  done:
3120   tso->link = END_TSO_QUEUE;
3121   tso->why_blocked = NotBlocked;
3122   tso->block_info.closure = NULL;
3123   PUSH_ON_RUN_QUEUE(tso);
3124 }
3125 #endif
3126
3127 /* -----------------------------------------------------------------------------
3128  * raiseAsync()
3129  *
3130  * The following function implements the magic for raising an
3131  * asynchronous exception in an existing thread.
3132  *
3133  * We first remove the thread from any queue on which it might be
3134  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3135  *
3136  * We strip the stack down to the innermost CATCH_FRAME, building
3137  * thunks in the heap for all the active computations, so they can 
3138  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3139  * an application of the handler to the exception, and push it on
3140  * the top of the stack.
3141  * 
3142  * How exactly do we save all the active computations?  We create an
3143  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3144  * AP_UPDs pushes everything from the corresponding update frame
3145  * upwards onto the stack.  (Actually, it pushes everything up to the
3146  * next update frame plus a pointer to the next AP_UPD object.
3147  * Entering the next AP_UPD object pushes more onto the stack until we
3148  * reach the last AP_UPD object - at which point the stack should look
3149  * exactly as it did when we killed the TSO and we can continue
3150  * execution by entering the closure on top of the stack.
3151  *
3152  * We can also kill a thread entirely - this happens if either (a) the 
3153  * exception passed to raiseAsync is NULL, or (b) there's no
3154  * CATCH_FRAME on the stack.  In either case, we strip the entire
3155  * stack and replace the thread with a zombie.
3156  *
3157  * Locks: sched_mutex held upon entry nor exit.
3158  *
3159  * -------------------------------------------------------------------------- */
3160  
3161 void 
3162 deleteThread(StgTSO *tso)
3163 {
3164   raiseAsync(tso,NULL);
3165 }
3166
3167 void
3168 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3169 {
3170   /* When raising async exs from contexts where sched_mutex isn't held;
3171      use raiseAsyncWithLock(). */
3172   ACQUIRE_LOCK(&sched_mutex);
3173   raiseAsync(tso,exception);
3174   RELEASE_LOCK(&sched_mutex);
3175 }
3176
3177 void
3178 raiseAsync(StgTSO *tso, StgClosure *exception)
3179 {
3180   StgUpdateFrame* su = tso->su;
3181   StgPtr          sp = tso->sp;
3182   
3183   /* Thread already dead? */
3184   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3185     return;
3186   }
3187
3188   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3189
3190   /* Remove it from any blocking queues */
3191   unblockThread(tso);
3192
3193   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3194   /* The stack freezing code assumes there's a closure pointer on
3195    * the top of the stack.  This isn't always the case with compiled
3196    * code, so we have to push a dummy closure on the top which just
3197    * returns to the next return address on the stack.
3198    */
3199   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3200     *(--sp) = (W_)&stg_dummy_ret_closure;
3201   }
3202
3203   while (1) {
3204     nat words = ((P_)su - (P_)sp) - 1;
3205     nat i;
3206     StgAP_UPD * ap;
3207
3208     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3209      * then build the THUNK raise(exception), and leave it on
3210      * top of the CATCH_FRAME ready to enter.
3211      */
3212     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3213 #ifdef PROFILING
3214       StgCatchFrame *cf = (StgCatchFrame *)su;
3215 #endif
3216       StgClosure *raise;
3217
3218       /* we've got an exception to raise, so let's pass it to the
3219        * handler in this frame.
3220        */
3221       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3222       TICK_ALLOC_SE_THK(1,0);
3223       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3224       raise->payload[0] = exception;
3225
3226       /* throw away the stack from Sp up to the CATCH_FRAME.
3227        */
3228       sp = (P_)su - 1;
3229
3230       /* Ensure that async excpetions are blocked now, so we don't get
3231        * a surprise exception before we get around to executing the
3232        * handler.
3233        */
3234       if (tso->blocked_exceptions == NULL) {
3235           tso->blocked_exceptions = END_TSO_QUEUE;
3236       }
3237
3238       /* Put the newly-built THUNK on top of the stack, ready to execute
3239        * when the thread restarts.
3240        */
3241       sp[0] = (W_)raise;
3242       tso->sp = sp;
3243       tso->su = su;
3244       tso->what_next = ThreadEnterGHC;
3245       IF_DEBUG(sanity, checkTSO(tso));
3246       return;
3247     }
3248
3249     /* First build an AP_UPD consisting of the stack chunk above the
3250      * current update frame, with the top word on the stack as the
3251      * fun field.
3252      */
3253     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3254     
3255     ASSERT(words >= 0);
3256     
3257     ap->n_args = words;
3258     ap->fun    = (StgClosure *)sp[0];
3259     sp++;
3260     for(i=0; i < (nat)words; ++i) {
3261       ap->payload[i] = (StgClosure *)*sp++;
3262     }
3263     
3264     switch (get_itbl(su)->type) {
3265       
3266     case UPDATE_FRAME:
3267       {
3268         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3269         TICK_ALLOC_UP_THK(words+1,0);
3270         
3271         IF_DEBUG(scheduler,
3272                  fprintf(stderr,  "scheduler: Updating ");
3273                  printPtr((P_)su->updatee); 
3274                  fprintf(stderr,  " with ");
3275                  printObj((StgClosure *)ap);
3276                  );
3277         
3278         /* Replace the updatee with an indirection - happily
3279          * this will also wake up any threads currently
3280          * waiting on the result.
3281          *
3282          * Warning: if we're in a loop, more than one update frame on
3283          * the stack may point to the same object.  Be careful not to
3284          * overwrite an IND_OLDGEN in this case, because we'll screw
3285          * up the mutable lists.  To be on the safe side, don't
3286          * overwrite any kind of indirection at all.  See also
3287          * threadSqueezeStack in GC.c, where we have to make a similar
3288          * check.
3289          */
3290         if (!closure_IND(su->updatee)) {
3291             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3292         }
3293         su = su->link;
3294         sp += sizeofW(StgUpdateFrame) -1;
3295         sp[0] = (W_)ap; /* push onto stack */
3296         break;
3297       }
3298
3299     case CATCH_FRAME:
3300       {
3301         StgCatchFrame *cf = (StgCatchFrame *)su;
3302         StgClosure* o;
3303         
3304         /* We want a PAP, not an AP_UPD.  Fortunately, the
3305          * layout's the same.
3306          */
3307         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3308         TICK_ALLOC_UPD_PAP(words+1,0);
3309         
3310         /* now build o = FUN(catch,ap,handler) */
3311         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3312         TICK_ALLOC_FUN(2,0);
3313         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3314         o->payload[0] = (StgClosure *)ap;
3315         o->payload[1] = cf->handler;
3316         
3317         IF_DEBUG(scheduler,
3318                  fprintf(stderr,  "scheduler: Built ");
3319                  printObj((StgClosure *)o);
3320                  );
3321         
3322         /* pop the old handler and put o on the stack */
3323         su = cf->link;
3324         sp += sizeofW(StgCatchFrame) - 1;
3325         sp[0] = (W_)o;
3326         break;
3327       }
3328       
3329     case SEQ_FRAME:
3330       {
3331         StgSeqFrame *sf = (StgSeqFrame *)su;
3332         StgClosure* o;
3333         
3334         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3335         TICK_ALLOC_UPD_PAP(words+1,0);
3336         
3337         /* now build o = FUN(seq,ap) */
3338         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3339         TICK_ALLOC_SE_THK(1,0);
3340         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3341         o->payload[0] = (StgClosure *)ap;
3342         
3343         IF_DEBUG(scheduler,
3344                  fprintf(stderr,  "scheduler: Built ");
3345                  printObj((StgClosure *)o);
3346                  );
3347         
3348         /* pop the old handler and put o on the stack */
3349         su = sf->link;
3350         sp += sizeofW(StgSeqFrame) - 1;
3351         sp[0] = (W_)o;
3352         break;
3353       }
3354       
3355     case STOP_FRAME:
3356       /* We've stripped the entire stack, the thread is now dead. */
3357       sp += sizeofW(StgStopFrame) - 1;
3358       sp[0] = (W_)exception;    /* save the exception */
3359       tso->what_next = ThreadKilled;
3360       tso->su = (StgUpdateFrame *)(sp+1);
3361       tso->sp = sp;
3362       return;
3363
3364     default:
3365       barf("raiseAsync");
3366     }
3367   }
3368   barf("raiseAsync");
3369 }
3370
3371 /* -----------------------------------------------------------------------------
3372    resurrectThreads is called after garbage collection on the list of
3373    threads found to be garbage.  Each of these threads will be woken
3374    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3375    on an MVar, or NonTermination if the thread was blocked on a Black
3376    Hole.
3377
3378    Locks: sched_mutex isn't held upon entry nor exit.
3379    -------------------------------------------------------------------------- */
3380
3381 void
3382 resurrectThreads( StgTSO *threads )
3383 {
3384   StgTSO *tso, *next;
3385
3386   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3387     next = tso->global_link;
3388     tso->global_link = all_threads;
3389     all_threads = tso;
3390     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3391
3392     switch (tso->why_blocked) {
3393     case BlockedOnMVar:
3394     case BlockedOnException:
3395       /* Called by GC - sched_mutex lock is currently held. */
3396       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3397       break;
3398     case BlockedOnBlackHole:
3399       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3400       break;
3401     case NotBlocked:
3402       /* This might happen if the thread was blocked on a black hole
3403        * belonging to a thread that we've just woken up (raiseAsync
3404        * can wake up threads, remember...).
3405        */
3406       continue;
3407     default:
3408       barf("resurrectThreads: thread blocked in a strange way");
3409     }
3410   }
3411 }
3412
3413 /* -----------------------------------------------------------------------------
3414  * Blackhole detection: if we reach a deadlock, test whether any
3415  * threads are blocked on themselves.  Any threads which are found to
3416  * be self-blocked get sent a NonTermination exception.
3417  *
3418  * This is only done in a deadlock situation in order to avoid
3419  * performance overhead in the normal case.
3420  *
3421  * Locks: sched_mutex is held upon entry and exit.
3422  * -------------------------------------------------------------------------- */
3423
3424 static void
3425 detectBlackHoles( void )
3426 {
3427     StgTSO *t = all_threads;
3428     StgUpdateFrame *frame;
3429     StgClosure *blocked_on;
3430
3431     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3432
3433         while (t->what_next == ThreadRelocated) {
3434             t = t->link;
3435             ASSERT(get_itbl(t)->type == TSO);
3436         }
3437       
3438         if (t->why_blocked != BlockedOnBlackHole) {
3439             continue;
3440         }
3441
3442         blocked_on = t->block_info.closure;
3443
3444         for (frame = t->su; ; frame = frame->link) {
3445             switch (get_itbl(frame)->type) {
3446
3447             case UPDATE_FRAME:
3448                 if (frame->updatee == blocked_on) {
3449                     /* We are blocking on one of our own computations, so
3450                      * send this thread the NonTermination exception.  
3451                      */
3452                     IF_DEBUG(scheduler, 
3453                              sched_belch("thread %d is blocked on itself", t->id));
3454                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3455                     goto done;
3456                 }
3457                 else {
3458                     continue;
3459                 }
3460
3461             case CATCH_FRAME:
3462             case SEQ_FRAME:
3463                 continue;
3464                 
3465             case STOP_FRAME:
3466                 break;
3467             }
3468             break;
3469         }
3470
3471     done: ;
3472     }   
3473 }
3474
3475 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3476 //@subsection Debugging Routines
3477
3478 /* -----------------------------------------------------------------------------
3479    Debugging: why is a thread blocked
3480    -------------------------------------------------------------------------- */
3481
3482 #ifdef DEBUG
3483
3484 void
3485 printThreadBlockage(StgTSO *tso)
3486 {
3487   switch (tso->why_blocked) {
3488   case BlockedOnRead:
3489     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3490     break;
3491   case BlockedOnWrite:
3492     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3493     break;
3494   case BlockedOnDelay:
3495     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3496     break;
3497   case BlockedOnMVar:
3498     fprintf(stderr,"is blocked on an MVar");
3499     break;
3500   case BlockedOnException:
3501     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3502             tso->block_info.tso->id);
3503     break;
3504   case BlockedOnBlackHole:
3505     fprintf(stderr,"is blocked on a black hole");
3506     break;
3507   case NotBlocked:
3508     fprintf(stderr,"is not blocked");
3509     break;
3510 #if defined(PAR)
3511   case BlockedOnGA:
3512     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3513             tso->block_info.closure, info_type(tso->block_info.closure));
3514     break;
3515   case BlockedOnGA_NoSend:
3516     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3517             tso->block_info.closure, info_type(tso->block_info.closure));
3518     break;
3519 #endif
3520 #if defined(RTS_SUPPORTS_THREADS)
3521   case BlockedOnCCall:
3522     fprintf(stderr,"is blocked on an external call");
3523     break;
3524 #endif
3525   default:
3526     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3527          tso->why_blocked, tso->id, tso);
3528   }
3529 }
3530
3531 void
3532 printThreadStatus(StgTSO *tso)
3533 {
3534   switch (tso->what_next) {
3535   case ThreadKilled:
3536     fprintf(stderr,"has been killed");
3537     break;
3538   case ThreadComplete:
3539     fprintf(stderr,"has completed");
3540     break;
3541   default:
3542     printThreadBlockage(tso);
3543   }
3544 }
3545
3546 void
3547 printAllThreads(void)
3548 {
3549   StgTSO *t;
3550
3551 # if defined(GRAN)
3552   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3553   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3554                        time_string, rtsFalse/*no commas!*/);
3555
3556   sched_belch("all threads at [%s]:", time_string);
3557 # elif defined(PAR)
3558   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3559   ullong_format_string(CURRENT_TIME,
3560                        time_string, rtsFalse/*no commas!*/);
3561
3562   sched_belch("all threads at [%s]:", time_string);
3563 # else
3564   sched_belch("all threads:");
3565 # endif
3566
3567   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3568     fprintf(stderr, "\tthread %d ", t->id);
3569     if (t->label) fprintf(stderr,"[\"%s\"] ",t->label);
3570     printThreadStatus(t);
3571     fprintf(stderr,"\n");
3572   }
3573 }
3574     
3575 /* 
3576    Print a whole blocking queue attached to node (debugging only).
3577 */
3578 //@cindex print_bq
3579 # if defined(PAR)
3580 void 
3581 print_bq (StgClosure *node)
3582 {
3583   StgBlockingQueueElement *bqe;
3584   StgTSO *tso;
3585   rtsBool end;
3586
3587   fprintf(stderr,"## BQ of closure %p (%s): ",
3588           node, info_type(node));
3589
3590   /* should cover all closures that may have a blocking queue */
3591   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3592          get_itbl(node)->type == FETCH_ME_BQ ||
3593          get_itbl(node)->type == RBH ||
3594          get_itbl(node)->type == MVAR);
3595     
3596   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3597
3598   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3599 }
3600
3601 /* 
3602    Print a whole blocking queue starting with the element bqe.
3603 */
3604 void 
3605 print_bqe (StgBlockingQueueElement *bqe)
3606 {
3607   rtsBool end;
3608
3609   /* 
3610      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3611   */
3612   for (end = (bqe==END_BQ_QUEUE);
3613        !end; // iterate until bqe points to a CONSTR
3614        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3615        bqe = end ? END_BQ_QUEUE : bqe->link) {
3616     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3617     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3618     /* types of closures that may appear in a blocking queue */
3619     ASSERT(get_itbl(bqe)->type == TSO ||           
3620            get_itbl(bqe)->type == BLOCKED_FETCH || 
3621            get_itbl(bqe)->type == CONSTR); 
3622     /* only BQs of an RBH end with an RBH_Save closure */
3623     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3624
3625     switch (get_itbl(bqe)->type) {
3626     case TSO:
3627       fprintf(stderr," TSO %u (%x),",
3628               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3629       break;
3630     case BLOCKED_FETCH:
3631       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3632               ((StgBlockedFetch *)bqe)->node, 
3633               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3634               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3635               ((StgBlockedFetch *)bqe)->ga.weight);
3636       break;
3637     case CONSTR:
3638       fprintf(stderr," %s (IP %p),",
3639               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3640                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3641                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3642                "RBH_Save_?"), get_itbl(bqe));
3643       break;
3644     default:
3645       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3646            info_type((StgClosure *)bqe)); // , node, info_type(node));
3647       break;
3648     }
3649   } /* for */
3650   fputc('\n', stderr);
3651 }
3652 # elif defined(GRAN)
3653 void 
3654 print_bq (StgClosure *node)
3655 {
3656   StgBlockingQueueElement *bqe;
3657   PEs node_loc, tso_loc;
3658   rtsBool end;
3659
3660   /* should cover all closures that may have a blocking queue */
3661   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3662          get_itbl(node)->type == FETCH_ME_BQ ||
3663          get_itbl(node)->type == RBH);
3664     
3665   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3666   node_loc = where_is(node);
3667
3668   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3669           node, info_type(node), node_loc);
3670
3671   /* 
3672      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3673   */
3674   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3675        !end; // iterate until bqe points to a CONSTR
3676        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3677     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3678     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3679     /* types of closures that may appear in a blocking queue */
3680     ASSERT(get_itbl(bqe)->type == TSO ||           
3681            get_itbl(bqe)->type == CONSTR); 
3682     /* only BQs of an RBH end with an RBH_Save closure */
3683     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3684
3685     tso_loc = where_is((StgClosure *)bqe);
3686     switch (get_itbl(bqe)->type) {
3687     case TSO:
3688       fprintf(stderr," TSO %d (%p) on [PE %d],",
3689               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3690       break;
3691     case CONSTR:
3692       fprintf(stderr," %s (IP %p),",
3693               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3694                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3695                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3696                "RBH_Save_?"), get_itbl(bqe));
3697       break;
3698     default:
3699       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3700            info_type((StgClosure *)bqe), node, info_type(node));
3701       break;
3702     }
3703   } /* for */
3704   fputc('\n', stderr);
3705 }
3706 #else
3707 /* 
3708    Nice and easy: only TSOs on the blocking queue
3709 */
3710 void 
3711 print_bq (StgClosure *node)
3712 {
3713   StgTSO *tso;
3714
3715   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3716   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3717        tso != END_TSO_QUEUE; 
3718        tso=tso->link) {
3719     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3720     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3721     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3722   }
3723   fputc('\n', stderr);
3724 }
3725 # endif
3726
3727 #if defined(PAR)
3728 static nat
3729 run_queue_len(void)
3730 {
3731   nat i;
3732   StgTSO *tso;
3733
3734   for (i=0, tso=run_queue_hd; 
3735        tso != END_TSO_QUEUE;
3736        i++, tso=tso->link)
3737     /* nothing */
3738
3739   return i;
3740 }
3741 #endif
3742
3743 static void
3744 sched_belch(char *s, ...)
3745 {
3746   va_list ap;
3747   va_start(ap,s);
3748 #ifdef SMP
3749   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3750 #elif defined(PAR)
3751   fprintf(stderr, "== ");
3752 #else
3753   fprintf(stderr, "scheduler: ");
3754 #endif
3755   vfprintf(stderr, s, ap);
3756   fprintf(stderr, "\n");
3757 }
3758
3759 #endif /* DEBUG */
3760
3761
3762 //@node Index,  , Debugging Routines, Main scheduling code
3763 //@subsection Index
3764
3765 //@index
3766 //* StgMainThread::  @cindex\s-+StgMainThread
3767 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3768 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3769 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3770 //* context_switch::  @cindex\s-+context_switch
3771 //* createThread::  @cindex\s-+createThread
3772 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3773 //* initScheduler::  @cindex\s-+initScheduler
3774 //* interrupted::  @cindex\s-+interrupted
3775 //* next_thread_id::  @cindex\s-+next_thread_id
3776 //* print_bq::  @cindex\s-+print_bq
3777 //* run_queue_hd::  @cindex\s-+run_queue_hd
3778 //* run_queue_tl::  @cindex\s-+run_queue_tl
3779 //* sched_mutex::  @cindex\s-+sched_mutex
3780 //* schedule::  @cindex\s-+schedule
3781 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3782 //* term_mutex::  @cindex\s-+term_mutex
3783 //@end index