[project @ 2005-01-28 12:55:17 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "Hooks.h"
48 #define COMPILING_SCHEDULER
49 #include "Schedule.h"
50 #include "StgMiscClosures.h"
51 #include "Storage.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PAR)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include "OSThreads.h"
80 #include  "Task.h"
81
82 #ifdef HAVE_SYS_TYPES_H
83 #include <sys/types.h>
84 #endif
85 #ifdef HAVE_UNISTD_H
86 #include <unistd.h>
87 #endif
88
89 #include <string.h>
90 #include <stdlib.h>
91 #include <stdarg.h>
92
93 #ifdef HAVE_ERRNO_H
94 #include <errno.h>
95 #endif
96
97 #ifdef THREADED_RTS
98 #define USED_IN_THREADED_RTS
99 #else
100 #define USED_IN_THREADED_RTS STG_UNUSED
101 #endif
102
103 #ifdef RTS_SUPPORTS_THREADS
104 #define USED_WHEN_RTS_SUPPORTS_THREADS
105 #else
106 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
107 #endif
108
109 /* Main thread queue.
110  * Locks required: sched_mutex.
111  */
112 StgMainThread *main_threads = NULL;
113
114 /* Thread queues.
115  * Locks required: sched_mutex.
116  */
117 #if defined(GRAN)
118
119 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
120 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
121
122 /* 
123    In GranSim we have a runnable and a blocked queue for each processor.
124    In order to minimise code changes new arrays run_queue_hds/tls
125    are created. run_queue_hd is then a short cut (macro) for
126    run_queue_hds[CurrentProc] (see GranSim.h).
127    -- HWL
128 */
129 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
130 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
131 StgTSO *ccalling_threadss[MAX_PROC];
132 /* We use the same global list of threads (all_threads) in GranSim as in
133    the std RTS (i.e. we are cheating). However, we don't use this list in
134    the GranSim specific code at the moment (so we are only potentially
135    cheating).  */
136
137 #else /* !GRAN */
138
139 StgTSO *run_queue_hd = NULL;
140 StgTSO *run_queue_tl = NULL;
141 StgTSO *blocked_queue_hd = NULL;
142 StgTSO *blocked_queue_tl = NULL;
143 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
144
145 #endif
146
147 /* Linked list of all threads.
148  * Used for detecting garbage collected threads.
149  */
150 StgTSO *all_threads = NULL;
151
152 /* When a thread performs a safe C call (_ccall_GC, using old
153  * terminology), it gets put on the suspended_ccalling_threads
154  * list. Used by the garbage collector.
155  */
156 static StgTSO *suspended_ccalling_threads;
157
158 static StgTSO *threadStackOverflow(StgTSO *tso);
159
160 /* KH: The following two flags are shared memory locations.  There is no need
161        to lock them, since they are only unset at the end of a scheduler
162        operation.
163 */
164
165 /* flag set by signal handler to precipitate a context switch */
166 int context_switch = 0;
167
168 /* if this flag is set as well, give up execution */
169 rtsBool interrupted = rtsFalse;
170
171 /* Next thread ID to allocate.
172  * Locks required: thread_id_mutex
173  */
174 static StgThreadID next_thread_id = 1;
175
176 /*
177  * Pointers to the state of the current thread.
178  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
179  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
180  */
181  
182 /* The smallest stack size that makes any sense is:
183  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
184  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
185  *  + 1                       (the closure to enter)
186  *  + 1                       (stg_ap_v_ret)
187  *  + 1                       (spare slot req'd by stg_ap_v_ret)
188  *
189  * A thread with this stack will bomb immediately with a stack
190  * overflow, which will increase its stack size.  
191  */
192
193 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
194
195
196 #if defined(GRAN)
197 StgTSO *CurrentTSO;
198 #endif
199
200 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
201  *  exists - earlier gccs apparently didn't.
202  *  -= chak
203  */
204 StgTSO dummy_tso;
205
206 static rtsBool ready_to_gc;
207
208 /*
209  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
210  * in an MT setting, needed to signal that a worker thread shouldn't hang around
211  * in the scheduler when it is out of work.
212  */
213 static rtsBool shutting_down_scheduler = rtsFalse;
214
215 void            addToBlockedQueue ( StgTSO *tso );
216
217 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
218        void     interruptStgRts   ( void );
219
220 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
221 static void     detectBlackHoles  ( void );
222 #endif
223
224 static void     raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);
225
226 #if defined(RTS_SUPPORTS_THREADS)
227 /* ToDo: carefully document the invariants that go together
228  *       with these synchronisation objects.
229  */
230 Mutex     sched_mutex       = INIT_MUTEX_VAR;
231 Mutex     term_mutex        = INIT_MUTEX_VAR;
232
233 #endif /* RTS_SUPPORTS_THREADS */
234
235 #if defined(PAR)
236 StgTSO *LastTSO;
237 rtsTime TimeOfLastYield;
238 rtsBool emitSchedule = rtsTrue;
239 #endif
240
241 #if DEBUG
242 static char *whatNext_strs[] = {
243   "(unknown)",
244   "ThreadRunGHC",
245   "ThreadInterpret",
246   "ThreadKilled",
247   "ThreadRelocated",
248   "ThreadComplete"
249 };
250 #endif
251
252 #if defined(PAR)
253 StgTSO * createSparkThread(rtsSpark spark);
254 StgTSO * activateSpark (rtsSpark spark);  
255 #endif
256
257 /* ----------------------------------------------------------------------------
258  * Starting Tasks
259  * ------------------------------------------------------------------------- */
260
261 #if defined(RTS_SUPPORTS_THREADS)
262 static rtsBool startingWorkerThread = rtsFalse;
263
264 static void taskStart(void);
265 static void
266 taskStart(void)
267 {
268   ACQUIRE_LOCK(&sched_mutex);
269   startingWorkerThread = rtsFalse;
270   schedule(NULL,NULL);
271   RELEASE_LOCK(&sched_mutex);
272 }
273
274 void
275 startSchedulerTaskIfNecessary(void)
276 {
277   if(run_queue_hd != END_TSO_QUEUE
278     || blocked_queue_hd != END_TSO_QUEUE
279     || sleeping_queue != END_TSO_QUEUE)
280   {
281     if(!startingWorkerThread)
282     { // we don't want to start another worker thread
283       // just because the last one hasn't yet reached the
284       // "waiting for capability" state
285       startingWorkerThread = rtsTrue;
286       if(!startTask(taskStart))
287       {
288         startingWorkerThread = rtsFalse;
289       }
290     }
291   }
292 }
293 #endif
294
295 /* ---------------------------------------------------------------------------
296    Main scheduling loop.
297
298    We use round-robin scheduling, each thread returning to the
299    scheduler loop when one of these conditions is detected:
300
301       * out of heap space
302       * timer expires (thread yields)
303       * thread blocks
304       * thread ends
305       * stack overflow
306
307    Locking notes:  we acquire the scheduler lock once at the beginning
308    of the scheduler loop, and release it when
309     
310       * running a thread, or
311       * waiting for work, or
312       * waiting for a GC to complete.
313
314    GRAN version:
315      In a GranSim setup this loop iterates over the global event queue.
316      This revolves around the global event queue, which determines what 
317      to do next. Therefore, it's more complicated than either the 
318      concurrent or the parallel (GUM) setup.
319
320    GUM version:
321      GUM iterates over incoming messages.
322      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
323      and sends out a fish whenever it has nothing to do; in-between
324      doing the actual reductions (shared code below) it processes the
325      incoming messages and deals with delayed operations 
326      (see PendingFetches).
327      This is not the ugliest code you could imagine, but it's bloody close.
328
329    ------------------------------------------------------------------------ */
330 static void
331 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
332           Capability *initialCapability )
333 {
334   StgTSO *t;
335   Capability *cap;
336   StgThreadReturnCode ret;
337 #if defined(GRAN)
338   rtsEvent *event;
339 #elif defined(PAR)
340   StgSparkPool *pool;
341   rtsSpark spark;
342   StgTSO *tso;
343   GlobalTaskId pe;
344   rtsBool receivedFinish = rtsFalse;
345 # if defined(DEBUG)
346   nat tp_size, sp_size; // stats only
347 # endif
348 #endif
349   rtsBool was_interrupted = rtsFalse;
350   nat prev_what_next;
351   
352   // Pre-condition: sched_mutex is held.
353   // We might have a capability, passed in as initialCapability.
354   cap = initialCapability;
355
356 #if defined(RTS_SUPPORTS_THREADS)
357   //
358   // in the threaded case, the capability is either passed in via the
359   // initialCapability parameter, or initialized inside the scheduler
360   // loop 
361   //
362   IF_DEBUG(scheduler,
363            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
364                        mainThread, initialCapability);
365       );
366 #else
367   // simply initialise it in the non-threaded case
368   grabCapability(&cap);
369 #endif
370
371 #if defined(GRAN)
372   /* set up first event to get things going */
373   /* ToDo: assign costs for system setup and init MainTSO ! */
374   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
375             ContinueThread, 
376             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
377
378   IF_DEBUG(gran,
379            debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
380            G_TSO(CurrentTSO, 5));
381
382   if (RtsFlags.GranFlags.Light) {
383     /* Save current time; GranSim Light only */
384     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
385   }      
386
387   event = get_next_event();
388
389   while (event!=(rtsEvent*)NULL) {
390     /* Choose the processor with the next event */
391     CurrentProc = event->proc;
392     CurrentTSO = event->tso;
393
394 #elif defined(PAR)
395
396   while (!receivedFinish) {    /* set by processMessages */
397                                /* when receiving PP_FINISH message         */ 
398
399 #else // everything except GRAN and PAR
400
401   while (1) {
402
403 #endif
404
405      IF_DEBUG(scheduler, printAllThreads());
406
407 #if defined(RTS_SUPPORTS_THREADS)
408       // Yield the capability to higher-priority tasks if necessary.
409       //
410       if (cap != NULL) {
411           yieldCapability(&cap);
412       }
413
414       // If we do not currently hold a capability, we wait for one
415       //
416       if (cap == NULL) {
417           waitForCapability(&sched_mutex, &cap,
418                             mainThread ? &mainThread->bound_thread_cond : NULL);
419       }
420
421       // We now have a capability...
422 #endif
423
424     //
425     // If we're interrupted (the user pressed ^C, or some other
426     // termination condition occurred), kill all the currently running
427     // threads.
428     //
429     if (interrupted) {
430         IF_DEBUG(scheduler, sched_belch("interrupted"));
431         interrupted = rtsFalse;
432         was_interrupted = rtsTrue;
433 #if defined(RTS_SUPPORTS_THREADS)
434         // In the threaded RTS, deadlock detection doesn't work,
435         // so just exit right away.
436         errorBelch("interrupted");
437         releaseCapability(cap);
438         RELEASE_LOCK(&sched_mutex);
439         shutdownHaskellAndExit(EXIT_SUCCESS);
440 #else
441         deleteAllThreads();
442 #endif
443     }
444
445 #if defined(RTS_USER_SIGNALS)
446     // check for signals each time around the scheduler
447     if (signals_pending()) {
448       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
449       startSignalHandlers();
450       ACQUIRE_LOCK(&sched_mutex);
451     }
452 #endif
453
454     //
455     // Check whether any waiting threads need to be woken up.  If the
456     // run queue is empty, and there are no other tasks running, we
457     // can wait indefinitely for something to happen.
458     //
459     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
460     {
461 #if defined(RTS_SUPPORTS_THREADS)
462         // We shouldn't be here...
463         barf("schedule: awaitEvent() in threaded RTS");
464 #endif
465         awaitEvent( EMPTY_RUN_QUEUE() );
466     }
467     // we can be interrupted while waiting for I/O...
468     if (interrupted) continue;
469
470     /* 
471      * Detect deadlock: when we have no threads to run, there are no
472      * threads waiting on I/O or sleeping, and all the other tasks are
473      * waiting for work, we must have a deadlock of some description.
474      *
475      * We first try to find threads blocked on themselves (ie. black
476      * holes), and generate NonTermination exceptions where necessary.
477      *
478      * If no threads are black holed, we have a deadlock situation, so
479      * inform all the main threads.
480      */
481 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
482     if (   EMPTY_THREAD_QUEUES() )
483     {
484         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
485
486         // Garbage collection can release some new threads due to
487         // either (a) finalizers or (b) threads resurrected because
488         // they are unreachable and will therefore be sent an
489         // exception.  Any threads thus released will be immediately
490         // runnable.
491         GarbageCollect(GetRoots,rtsTrue);
492         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
493
494 #if defined(RTS_USER_SIGNALS)
495         /* If we have user-installed signal handlers, then wait
496          * for signals to arrive rather then bombing out with a
497          * deadlock.
498          */
499         if ( anyUserHandlers() ) {
500             IF_DEBUG(scheduler, 
501                      sched_belch("still deadlocked, waiting for signals..."));
502
503             awaitUserSignals();
504
505             // we might be interrupted...
506             if (interrupted) { continue; }
507
508             if (signals_pending()) {
509                 RELEASE_LOCK(&sched_mutex);
510                 startSignalHandlers();
511                 ACQUIRE_LOCK(&sched_mutex);
512             }
513             ASSERT(!EMPTY_RUN_QUEUE());
514             goto not_deadlocked;
515         }
516 #endif
517
518         /* Probably a real deadlock.  Send the current main thread the
519          * Deadlock exception (or in the SMP build, send *all* main
520          * threads the deadlock exception, since none of them can make
521          * progress).
522          */
523         {
524             StgMainThread *m;
525             m = main_threads;
526             switch (m->tso->why_blocked) {
527             case BlockedOnBlackHole:
528             case BlockedOnException:
529             case BlockedOnMVar:
530                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
531                 break;
532             default:
533                 barf("deadlock: main thread blocked in a strange way");
534             }
535         }
536     }
537   not_deadlocked:
538
539 #elif defined(RTS_SUPPORTS_THREADS)
540     // ToDo: add deadlock detection in threaded RTS
541 #elif defined(PAR)
542     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
543 #endif
544
545 #if defined(RTS_SUPPORTS_THREADS) || defined(mingw32_HOST_OS)
546     /* win32: might be back here due to awaitEvent() being abandoned
547      * as a result of a console event having been delivered.
548      */
549     if ( EMPTY_RUN_QUEUE() ) {
550         continue; // nothing to do
551     }
552 #endif
553
554 #if defined(GRAN)
555     if (RtsFlags.GranFlags.Light)
556       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
557
558     /* adjust time based on time-stamp */
559     if (event->time > CurrentTime[CurrentProc] &&
560         event->evttype != ContinueThread)
561       CurrentTime[CurrentProc] = event->time;
562     
563     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
564     if (!RtsFlags.GranFlags.Light)
565       handleIdlePEs();
566
567     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
568
569     /* main event dispatcher in GranSim */
570     switch (event->evttype) {
571       /* Should just be continuing execution */
572     case ContinueThread:
573       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
574       /* ToDo: check assertion
575       ASSERT(run_queue_hd != (StgTSO*)NULL &&
576              run_queue_hd != END_TSO_QUEUE);
577       */
578       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
579       if (!RtsFlags.GranFlags.DoAsyncFetch &&
580           procStatus[CurrentProc]==Fetching) {
581         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
582               CurrentTSO->id, CurrentTSO, CurrentProc);
583         goto next_thread;
584       } 
585       /* Ignore ContinueThreads for completed threads */
586       if (CurrentTSO->what_next == ThreadComplete) {
587         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
588               CurrentTSO->id, CurrentTSO, CurrentProc);
589         goto next_thread;
590       } 
591       /* Ignore ContinueThreads for threads that are being migrated */
592       if (PROCS(CurrentTSO)==Nowhere) { 
593         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
594               CurrentTSO->id, CurrentTSO, CurrentProc);
595         goto next_thread;
596       }
597       /* The thread should be at the beginning of the run queue */
598       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
599         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
600               CurrentTSO->id, CurrentTSO, CurrentProc);
601         break; // run the thread anyway
602       }
603       /*
604       new_event(proc, proc, CurrentTime[proc],
605                 FindWork,
606                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
607       goto next_thread; 
608       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
609       break; // now actually run the thread; DaH Qu'vam yImuHbej 
610
611     case FetchNode:
612       do_the_fetchnode(event);
613       goto next_thread;             /* handle next event in event queue  */
614       
615     case GlobalBlock:
616       do_the_globalblock(event);
617       goto next_thread;             /* handle next event in event queue  */
618       
619     case FetchReply:
620       do_the_fetchreply(event);
621       goto next_thread;             /* handle next event in event queue  */
622       
623     case UnblockThread:   /* Move from the blocked queue to the tail of */
624       do_the_unblock(event);
625       goto next_thread;             /* handle next event in event queue  */
626       
627     case ResumeThread:  /* Move from the blocked queue to the tail of */
628       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
629       event->tso->gran.blocktime += 
630         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
631       do_the_startthread(event);
632       goto next_thread;             /* handle next event in event queue  */
633       
634     case StartThread:
635       do_the_startthread(event);
636       goto next_thread;             /* handle next event in event queue  */
637       
638     case MoveThread:
639       do_the_movethread(event);
640       goto next_thread;             /* handle next event in event queue  */
641       
642     case MoveSpark:
643       do_the_movespark(event);
644       goto next_thread;             /* handle next event in event queue  */
645       
646     case FindWork:
647       do_the_findwork(event);
648       goto next_thread;             /* handle next event in event queue  */
649       
650     default:
651       barf("Illegal event type %u\n", event->evttype);
652     }  /* switch */
653     
654     /* This point was scheduler_loop in the old RTS */
655
656     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
657
658     TimeOfLastEvent = CurrentTime[CurrentProc];
659     TimeOfNextEvent = get_time_of_next_event();
660     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
661     // CurrentTSO = ThreadQueueHd;
662
663     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
664                          TimeOfNextEvent));
665
666     if (RtsFlags.GranFlags.Light) 
667       GranSimLight_leave_system(event, &ActiveTSO); 
668
669     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
670
671     IF_DEBUG(gran, 
672              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
673
674     /* in a GranSim setup the TSO stays on the run queue */
675     t = CurrentTSO;
676     /* Take a thread from the run queue. */
677     POP_RUN_QUEUE(t); // take_off_run_queue(t);
678
679     IF_DEBUG(gran, 
680              debugBelch("GRAN: About to run current thread, which is\n");
681              G_TSO(t,5));
682
683     context_switch = 0; // turned on via GranYield, checking events and time slice
684
685     IF_DEBUG(gran, 
686              DumpGranEvent(GR_SCHEDULE, t));
687
688     procStatus[CurrentProc] = Busy;
689
690 #elif defined(PAR)
691     if (PendingFetches != END_BF_QUEUE) {
692         processFetches();
693     }
694
695     /* ToDo: phps merge with spark activation above */
696     /* check whether we have local work and send requests if we have none */
697     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
698       /* :-[  no local threads => look out for local sparks */
699       /* the spark pool for the current PE */
700       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
701       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
702           pool->hd < pool->tl) {
703         /* 
704          * ToDo: add GC code check that we really have enough heap afterwards!!
705          * Old comment:
706          * If we're here (no runnable threads) and we have pending
707          * sparks, we must have a space problem.  Get enough space
708          * to turn one of those pending sparks into a
709          * thread... 
710          */
711
712         spark = findSpark(rtsFalse);                /* get a spark */
713         if (spark != (rtsSpark) NULL) {
714           tso = activateSpark(spark);       /* turn the spark into a thread */
715           IF_PAR_DEBUG(schedule,
716                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
717                              tso->id, tso, advisory_thread_count));
718
719           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
720             debugBelch("==^^ failed to activate spark\n");
721             goto next_thread;
722           }               /* otherwise fall through & pick-up new tso */
723         } else {
724           IF_PAR_DEBUG(verbose,
725                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
726                              spark_queue_len(pool)));
727           goto next_thread;
728         }
729       }
730
731       /* If we still have no work we need to send a FISH to get a spark
732          from another PE 
733       */
734       if (EMPTY_RUN_QUEUE()) {
735       /* =8-[  no local sparks => look for work on other PEs */
736         /*
737          * We really have absolutely no work.  Send out a fish
738          * (there may be some out there already), and wait for
739          * something to arrive.  We clearly can't run any threads
740          * until a SCHEDULE or RESUME arrives, and so that's what
741          * we're hoping to see.  (Of course, we still have to
742          * respond to other types of messages.)
743          */
744         TIME now = msTime() /*CURRENT_TIME*/;
745         IF_PAR_DEBUG(verbose, 
746                      debugBelch("--  now=%ld\n", now));
747         IF_PAR_DEBUG(verbose,
748                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
749                          (last_fish_arrived_at!=0 &&
750                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
751                        debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
752                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
753                              last_fish_arrived_at,
754                              RtsFlags.ParFlags.fishDelay, now);
755                      });
756         
757         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
758             (last_fish_arrived_at==0 ||
759              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
760           /* outstandingFishes is set in sendFish, processFish;
761              avoid flooding system with fishes via delay */
762           pe = choosePE();
763           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
764                    NEW_FISH_HUNGER);
765
766           // Global statistics: count no. of fishes
767           if (RtsFlags.ParFlags.ParStats.Global &&
768               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
769             globalParStats.tot_fish_mess++;
770           }
771         }
772       
773         receivedFinish = processMessages();
774         goto next_thread;
775       }
776     } else if (PacketsWaiting()) {  /* Look for incoming messages */
777       receivedFinish = processMessages();
778     }
779
780     /* Now we are sure that we have some work available */
781     ASSERT(run_queue_hd != END_TSO_QUEUE);
782
783     /* Take a thread from the run queue, if we have work */
784     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
785     IF_DEBUG(sanity,checkTSO(t));
786
787     /* ToDo: write something to the log-file
788     if (RTSflags.ParFlags.granSimStats && !sameThread)
789         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
790
791     CurrentTSO = t;
792     */
793     /* the spark pool for the current PE */
794     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
795
796     IF_DEBUG(scheduler, 
797              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
798                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
799
800 # if 1
801     if (0 && RtsFlags.ParFlags.ParStats.Full && 
802         t && LastTSO && t->id != LastTSO->id && 
803         LastTSO->why_blocked == NotBlocked && 
804         LastTSO->what_next != ThreadComplete) {
805       // if previously scheduled TSO not blocked we have to record the context switch
806       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
807                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
808     }
809
810     if (RtsFlags.ParFlags.ParStats.Full && 
811         (emitSchedule /* forced emit */ ||
812         (t && LastTSO && t->id != LastTSO->id))) {
813       /* 
814          we are running a different TSO, so write a schedule event to log file
815          NB: If we use fair scheduling we also have to write  a deschedule 
816              event for LastTSO; with unfair scheduling we know that the
817              previous tso has blocked whenever we switch to another tso, so
818              we don't need it in GUM for now
819       */
820       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
821                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
822       emitSchedule = rtsFalse;
823     }
824      
825 # endif
826 #else /* !GRAN && !PAR */
827   
828     // grab a thread from the run queue
829     ASSERT(run_queue_hd != END_TSO_QUEUE);
830     POP_RUN_QUEUE(t);
831
832     // Sanity check the thread we're about to run.  This can be
833     // expensive if there is lots of thread switching going on...
834     IF_DEBUG(sanity,checkTSO(t));
835 #endif
836
837 #ifdef THREADED_RTS
838     {
839       StgMainThread *m = t->main;
840       
841       if(m)
842       {
843         if(m == mainThread)
844         {
845           IF_DEBUG(scheduler,
846             sched_belch("### Running thread %d in bound thread", t->id));
847           // yes, the Haskell thread is bound to the current native thread
848         }
849         else
850         {
851           IF_DEBUG(scheduler,
852             sched_belch("### thread %d bound to another OS thread", t->id));
853           // no, bound to a different Haskell thread: pass to that thread
854           PUSH_ON_RUN_QUEUE(t);
855           passCapability(&m->bound_thread_cond);
856           continue;
857         }
858       }
859       else
860       {
861         if(mainThread != NULL)
862         // The thread we want to run is bound.
863         {
864           IF_DEBUG(scheduler,
865             sched_belch("### this OS thread cannot run thread %d", t->id));
866           // no, the current native thread is bound to a different
867           // Haskell thread, so pass it to any worker thread
868           PUSH_ON_RUN_QUEUE(t);
869           passCapabilityToWorker();
870           continue; 
871         }
872       }
873     }
874 #endif
875
876     cap->r.rCurrentTSO = t;
877     
878     /* context switches are now initiated by the timer signal, unless
879      * the user specified "context switch as often as possible", with
880      * +RTS -C0
881      */
882     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
883          && (run_queue_hd != END_TSO_QUEUE
884              || blocked_queue_hd != END_TSO_QUEUE
885              || sleeping_queue != END_TSO_QUEUE)))
886         context_switch = 1;
887
888 run_thread:
889
890     RELEASE_LOCK(&sched_mutex);
891
892     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
893                               (long)t->id, whatNext_strs[t->what_next]));
894
895 #ifdef PROFILING
896     startHeapProfTimer();
897 #endif
898
899     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
900     /* Run the current thread 
901      */
902     prev_what_next = t->what_next;
903
904     errno = t->saved_errno;
905
906     switch (prev_what_next) {
907
908     case ThreadKilled:
909     case ThreadComplete:
910         /* Thread already finished, return to scheduler. */
911         ret = ThreadFinished;
912         break;
913
914     case ThreadRunGHC:
915         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
916         break;
917
918     case ThreadInterpret:
919         ret = interpretBCO(cap);
920         break;
921
922     default:
923       barf("schedule: invalid what_next field");
924     }
925
926     // The TSO might have moved, so find the new location:
927     t = cap->r.rCurrentTSO;
928
929     // And save the current errno in this thread.
930     t->saved_errno = errno;
931
932     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
933     
934     /* Costs for the scheduler are assigned to CCS_SYSTEM */
935 #ifdef PROFILING
936     stopHeapProfTimer();
937     CCCS = CCS_SYSTEM;
938 #endif
939     
940     ACQUIRE_LOCK(&sched_mutex);
941     
942 #ifdef RTS_SUPPORTS_THREADS
943     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
944 #elif !defined(GRAN) && !defined(PAR)
945     IF_DEBUG(scheduler,debugBelch("sched: "););
946 #endif
947     
948 #if defined(PAR)
949     /* HACK 675: if the last thread didn't yield, make sure to print a 
950        SCHEDULE event to the log file when StgRunning the next thread, even
951        if it is the same one as before */
952     LastTSO = t; 
953     TimeOfLastYield = CURRENT_TIME;
954 #endif
955
956     switch (ret) {
957     case HeapOverflow:
958 #if defined(GRAN)
959       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
960       globalGranStats.tot_heapover++;
961 #elif defined(PAR)
962       globalParStats.tot_heapover++;
963 #endif
964
965       // did the task ask for a large block?
966       if (cap->r.rHpAlloc > BLOCK_SIZE) {
967           // if so, get one and push it on the front of the nursery.
968           bdescr *bd;
969           nat blocks;
970           
971           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
972
973           IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", 
974                                    (long)t->id, whatNext_strs[t->what_next], blocks));
975
976           // don't do this if it would push us over the
977           // alloc_blocks_lim limit; we'll GC first.
978           if (alloc_blocks + blocks < alloc_blocks_lim) {
979
980               alloc_blocks += blocks;
981               bd = allocGroup( blocks );
982
983               // link the new group into the list
984               bd->link = cap->r.rCurrentNursery;
985               bd->u.back = cap->r.rCurrentNursery->u.back;
986               if (cap->r.rCurrentNursery->u.back != NULL) {
987                   cap->r.rCurrentNursery->u.back->link = bd;
988               } else {
989                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
990                          g0s0->blocks == cap->r.rNursery);
991                   cap->r.rNursery = g0s0->blocks = bd;
992               }           
993               cap->r.rCurrentNursery->u.back = bd;
994
995               // initialise it as a nursery block.  We initialise the
996               // step, gen_no, and flags field of *every* sub-block in
997               // this large block, because this is easier than making
998               // sure that we always find the block head of a large
999               // block whenever we call Bdescr() (eg. evacuate() and
1000               // isAlive() in the GC would both have to do this, at
1001               // least).
1002               { 
1003                   bdescr *x;
1004                   for (x = bd; x < bd + blocks; x++) {
1005                       x->step = g0s0;
1006                       x->gen_no = 0;
1007                       x->flags = 0;
1008                   }
1009               }
1010
1011               // don't forget to update the block count in g0s0.
1012               g0s0->n_blocks += blocks;
1013               // This assert can be a killer if the app is doing lots
1014               // of large block allocations.
1015               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1016
1017               // now update the nursery to point to the new block
1018               cap->r.rCurrentNursery = bd;
1019
1020               // we might be unlucky and have another thread get on the
1021               // run queue before us and steal the large block, but in that
1022               // case the thread will just end up requesting another large
1023               // block.
1024               PUSH_ON_RUN_QUEUE(t);
1025               break;
1026           }
1027       }
1028
1029       /* make all the running tasks block on a condition variable,
1030        * maybe set context_switch and wait till they all pile in,
1031        * then have them wait on a GC condition variable.
1032        */
1033       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1034                                (long)t->id, whatNext_strs[t->what_next]));
1035       threadPaused(t);
1036 #if defined(GRAN)
1037       ASSERT(!is_on_queue(t,CurrentProc));
1038 #elif defined(PAR)
1039       /* Currently we emit a DESCHEDULE event before GC in GUM.
1040          ToDo: either add separate event to distinguish SYSTEM time from rest
1041                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1042       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1043         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1044                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1045         emitSchedule = rtsTrue;
1046       }
1047 #endif
1048       
1049       ready_to_gc = rtsTrue;
1050       context_switch = 1;               /* stop other threads ASAP */
1051       PUSH_ON_RUN_QUEUE(t);
1052       /* actual GC is done at the end of the while loop */
1053       break;
1054       
1055     case StackOverflow:
1056 #if defined(GRAN)
1057       IF_DEBUG(gran, 
1058                DumpGranEvent(GR_DESCHEDULE, t));
1059       globalGranStats.tot_stackover++;
1060 #elif defined(PAR)
1061       // IF_DEBUG(par, 
1062       // DumpGranEvent(GR_DESCHEDULE, t);
1063       globalParStats.tot_stackover++;
1064 #endif
1065       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1066                                (long)t->id, whatNext_strs[t->what_next]));
1067       /* just adjust the stack for this thread, then pop it back
1068        * on the run queue.
1069        */
1070       threadPaused(t);
1071       { 
1072         /* enlarge the stack */
1073         StgTSO *new_t = threadStackOverflow(t);
1074         
1075         /* This TSO has moved, so update any pointers to it from the
1076          * main thread stack.  It better not be on any other queues...
1077          * (it shouldn't be).
1078          */
1079         if (t->main != NULL) {
1080             t->main->tso = new_t;
1081         }
1082         PUSH_ON_RUN_QUEUE(new_t);
1083       }
1084       break;
1085
1086     case ThreadYielding:
1087       // Reset the context switch flag.  We don't do this just before
1088       // running the thread, because that would mean we would lose ticks
1089       // during GC, which can lead to unfair scheduling (a thread hogs
1090       // the CPU because the tick always arrives during GC).  This way
1091       // penalises threads that do a lot of allocation, but that seems
1092       // better than the alternative.
1093       context_switch = 0;
1094
1095 #if defined(GRAN)
1096       IF_DEBUG(gran, 
1097                DumpGranEvent(GR_DESCHEDULE, t));
1098       globalGranStats.tot_yields++;
1099 #elif defined(PAR)
1100       // IF_DEBUG(par, 
1101       // DumpGranEvent(GR_DESCHEDULE, t);
1102       globalParStats.tot_yields++;
1103 #endif
1104       /* put the thread back on the run queue.  Then, if we're ready to
1105        * GC, check whether this is the last task to stop.  If so, wake
1106        * up the GC thread.  getThread will block during a GC until the
1107        * GC is finished.
1108        */
1109       IF_DEBUG(scheduler,
1110                if (t->what_next != prev_what_next) {
1111                    debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1112                          (long)t->id, whatNext_strs[t->what_next]);
1113                } else {
1114                    debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1115                          (long)t->id, whatNext_strs[t->what_next]);
1116                }
1117                );
1118
1119       IF_DEBUG(sanity,
1120                //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1121                checkTSO(t));
1122       ASSERT(t->link == END_TSO_QUEUE);
1123
1124       // Shortcut if we're just switching evaluators: don't bother
1125       // doing stack squeezing (which can be expensive), just run the
1126       // thread.
1127       if (t->what_next != prev_what_next) {
1128           goto run_thread;
1129       }
1130
1131       threadPaused(t);
1132
1133 #if defined(GRAN)
1134       ASSERT(!is_on_queue(t,CurrentProc));
1135
1136       IF_DEBUG(sanity,
1137                //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1138                checkThreadQsSanity(rtsTrue));
1139 #endif
1140
1141 #if defined(PAR)
1142       if (RtsFlags.ParFlags.doFairScheduling) { 
1143         /* this does round-robin scheduling; good for concurrency */
1144         APPEND_TO_RUN_QUEUE(t);
1145       } else {
1146         /* this does unfair scheduling; good for parallelism */
1147         PUSH_ON_RUN_QUEUE(t);
1148       }
1149 #else
1150       // this does round-robin scheduling; good for concurrency
1151       APPEND_TO_RUN_QUEUE(t);
1152 #endif
1153
1154 #if defined(GRAN)
1155       /* add a ContinueThread event to actually process the thread */
1156       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1157                 ContinueThread,
1158                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1159       IF_GRAN_DEBUG(bq, 
1160                debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1161                G_EVENTQ(0);
1162                G_CURR_THREADQ(0));
1163 #endif /* GRAN */
1164       break;
1165
1166     case ThreadBlocked:
1167 #if defined(GRAN)
1168       IF_DEBUG(scheduler,
1169                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1170                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1171                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1172
1173       // ??? needed; should emit block before
1174       IF_DEBUG(gran, 
1175                DumpGranEvent(GR_DESCHEDULE, t)); 
1176       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1177       /*
1178         ngoq Dogh!
1179       ASSERT(procStatus[CurrentProc]==Busy || 
1180               ((procStatus[CurrentProc]==Fetching) && 
1181               (t->block_info.closure!=(StgClosure*)NULL)));
1182       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1183           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1184             procStatus[CurrentProc]==Fetching)) 
1185         procStatus[CurrentProc] = Idle;
1186       */
1187 #elif defined(PAR)
1188       IF_DEBUG(scheduler,
1189                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1190                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1191       IF_PAR_DEBUG(bq,
1192
1193                    if (t->block_info.closure!=(StgClosure*)NULL) 
1194                      print_bq(t->block_info.closure));
1195
1196       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1197       blockThread(t);
1198
1199       /* whatever we schedule next, we must log that schedule */
1200       emitSchedule = rtsTrue;
1201
1202 #else /* !GRAN */
1203       /* don't need to do anything.  Either the thread is blocked on
1204        * I/O, in which case we'll have called addToBlockedQueue
1205        * previously, or it's blocked on an MVar or Blackhole, in which
1206        * case it'll be on the relevant queue already.
1207        */
1208       ASSERT(t->why_blocked != NotBlocked);
1209       IF_DEBUG(scheduler,
1210                debugBelch("--<< thread %d (%s) stopped: ", 
1211                        t->id, whatNext_strs[t->what_next]);
1212                printThreadBlockage(t);
1213                debugBelch("\n"));
1214
1215       /* Only for dumping event to log file 
1216          ToDo: do I need this in GranSim, too?
1217       blockThread(t);
1218       */
1219 #endif
1220       threadPaused(t);
1221       break;
1222
1223     case ThreadFinished:
1224       /* Need to check whether this was a main thread, and if so, signal
1225        * the task that started it with the return value.  If we have no
1226        * more main threads, we probably need to stop all the tasks until
1227        * we get a new one.
1228        */
1229       /* We also end up here if the thread kills itself with an
1230        * uncaught exception, see Exception.hc.
1231        */
1232       IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1233                                t->id, whatNext_strs[t->what_next]));
1234 #if defined(GRAN)
1235       endThread(t, CurrentProc); // clean-up the thread
1236 #elif defined(PAR)
1237       /* For now all are advisory -- HWL */
1238       //if(t->priority==AdvisoryPriority) ??
1239       advisory_thread_count--;
1240       
1241 # ifdef DIST
1242       if(t->dist.priority==RevalPriority)
1243         FinishReval(t);
1244 # endif
1245       
1246       if (RtsFlags.ParFlags.ParStats.Full &&
1247           !RtsFlags.ParFlags.ParStats.Suppressed) 
1248         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1249 #endif
1250
1251       //
1252       // Check whether the thread that just completed was a main
1253       // thread, and if so return with the result.  
1254       //
1255       // There is an assumption here that all thread completion goes
1256       // through this point; we need to make sure that if a thread
1257       // ends up in the ThreadKilled state, that it stays on the run
1258       // queue so it can be dealt with here.
1259       //
1260       if (
1261 #if defined(RTS_SUPPORTS_THREADS)
1262           mainThread != NULL
1263 #else
1264           mainThread->tso == t
1265 #endif
1266           )
1267       {
1268           // We are a bound thread: this must be our thread that just
1269           // completed.
1270           ASSERT(mainThread->tso == t);
1271
1272           if (t->what_next == ThreadComplete) {
1273               if (mainThread->ret) {
1274                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1275                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1276               }
1277               mainThread->stat = Success;
1278           } else {
1279               if (mainThread->ret) {
1280                   *(mainThread->ret) = NULL;
1281               }
1282               if (was_interrupted) {
1283                   mainThread->stat = Interrupted;
1284               } else {
1285                   mainThread->stat = Killed;
1286               }
1287           }
1288 #ifdef DEBUG
1289           removeThreadLabel((StgWord)mainThread->tso->id);
1290 #endif
1291           if (mainThread->prev == NULL) {
1292               main_threads = mainThread->link;
1293           } else {
1294               mainThread->prev->link = mainThread->link;
1295           }
1296           if (mainThread->link != NULL) {
1297               mainThread->link->prev = NULL;
1298           }
1299           releaseCapability(cap);
1300           return;
1301       }
1302
1303 #ifdef RTS_SUPPORTS_THREADS
1304       ASSERT(t->main == NULL);
1305 #else
1306       if (t->main != NULL) {
1307           // Must be a main thread that is not the topmost one.  Leave
1308           // it on the run queue until the stack has unwound to the
1309           // point where we can deal with this.  Leaving it on the run
1310           // queue also ensures that the garbage collector knows about
1311           // this thread and its return value (it gets dropped from the
1312           // all_threads list so there's no other way to find it).
1313           APPEND_TO_RUN_QUEUE(t);
1314       }
1315 #endif
1316       break;
1317
1318     default:
1319       barf("schedule: invalid thread return code %d", (int)ret);
1320     }
1321
1322 #ifdef PROFILING
1323     // When we have +RTS -i0 and we're heap profiling, do a census at
1324     // every GC.  This lets us get repeatable runs for debugging.
1325     if (performHeapProfile ||
1326         (RtsFlags.ProfFlags.profileInterval==0 &&
1327          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1328         GarbageCollect(GetRoots, rtsTrue);
1329         heapCensus();
1330         performHeapProfile = rtsFalse;
1331         ready_to_gc = rtsFalse; // we already GC'd
1332     }
1333 #endif
1334
1335     if (ready_to_gc) {
1336       /* Kick any transactions which are invalid back to their atomically frames.
1337        * When next scheduled they will try to commit, this commit will fail and
1338        * they will retry. */
1339       for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1340         if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1341           if (!stmValidateTransaction (t -> trec)) {
1342             IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1343
1344             // strip the stack back to the ATOMICALLY_FRAME, aborting
1345             // the (nested) transaction, and saving the stack of any
1346             // partially-evaluated thunks on the heap.
1347             raiseAsync_(t, NULL, rtsTrue);
1348             
1349 #ifdef REG_R1
1350             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1351 #endif
1352           }
1353         }
1354       }
1355
1356       /* everybody back, start the GC.
1357        * Could do it in this thread, or signal a condition var
1358        * to do it in another thread.  Either way, we need to
1359        * broadcast on gc_pending_cond afterward.
1360        */
1361 #if defined(RTS_SUPPORTS_THREADS)
1362       IF_DEBUG(scheduler,sched_belch("doing GC"));
1363 #endif
1364       GarbageCollect(GetRoots,rtsFalse);
1365       ready_to_gc = rtsFalse;
1366 #if defined(GRAN)
1367       /* add a ContinueThread event to continue execution of current thread */
1368       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1369                 ContinueThread,
1370                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1371       IF_GRAN_DEBUG(bq, 
1372                debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1373                G_EVENTQ(0);
1374                G_CURR_THREADQ(0));
1375 #endif /* GRAN */
1376     }
1377
1378 #if defined(GRAN)
1379   next_thread:
1380     IF_GRAN_DEBUG(unused,
1381                   print_eventq(EventHd));
1382
1383     event = get_next_event();
1384 #elif defined(PAR)
1385   next_thread:
1386     /* ToDo: wait for next message to arrive rather than busy wait */
1387 #endif /* GRAN */
1388
1389   } /* end of while(1) */
1390
1391   IF_PAR_DEBUG(verbose,
1392                debugBelch("== Leaving schedule() after having received Finish\n"));
1393 }
1394
1395 /* ---------------------------------------------------------------------------
1396  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1397  * used by Control.Concurrent for error checking.
1398  * ------------------------------------------------------------------------- */
1399  
1400 StgBool
1401 rtsSupportsBoundThreads(void)
1402 {
1403 #ifdef THREADED_RTS
1404   return rtsTrue;
1405 #else
1406   return rtsFalse;
1407 #endif
1408 }
1409
1410 /* ---------------------------------------------------------------------------
1411  * isThreadBound(tso): check whether tso is bound to an OS thread.
1412  * ------------------------------------------------------------------------- */
1413  
1414 StgBool
1415 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1416 {
1417 #ifdef THREADED_RTS
1418   return (tso->main != NULL);
1419 #endif
1420   return rtsFalse;
1421 }
1422
1423 /* ---------------------------------------------------------------------------
1424  * Singleton fork(). Do not copy any running threads.
1425  * ------------------------------------------------------------------------- */
1426
1427 #ifndef mingw32_HOST_OS
1428 #define FORKPROCESS_PRIMOP_SUPPORTED
1429 #endif
1430
1431 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1432 static void 
1433 deleteThreadImmediately(StgTSO *tso);
1434 #endif
1435 StgInt
1436 forkProcess(HsStablePtr *entry
1437 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1438             STG_UNUSED
1439 #endif
1440            )
1441 {
1442 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1443   pid_t pid;
1444   StgTSO* t,*next;
1445   StgMainThread *m;
1446   SchedulerStatus rc;
1447
1448   IF_DEBUG(scheduler,sched_belch("forking!"));
1449   rts_lock(); // This not only acquires sched_mutex, it also
1450               // makes sure that no other threads are running
1451
1452   pid = fork();
1453
1454   if (pid) { /* parent */
1455
1456   /* just return the pid */
1457     rts_unlock();
1458     return pid;
1459     
1460   } else { /* child */
1461     
1462     
1463       // delete all threads
1464     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1465     
1466     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1467       next = t->link;
1468
1469         // don't allow threads to catch the ThreadKilled exception
1470       deleteThreadImmediately(t);
1471     }
1472     
1473       // wipe the main thread list
1474     while((m = main_threads) != NULL) {
1475       main_threads = m->link;
1476 # ifdef THREADED_RTS
1477       closeCondition(&m->bound_thread_cond);
1478 # endif
1479       stgFree(m);
1480     }
1481     
1482     rc = rts_evalStableIO(entry, NULL);  // run the action
1483     rts_checkSchedStatus("forkProcess",rc);
1484     
1485     rts_unlock();
1486     
1487     hs_exit();                      // clean up and exit
1488     stg_exit(0);
1489   }
1490 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1491   barf("forkProcess#: primop not supported, sorry!\n");
1492   return -1;
1493 #endif
1494 }
1495
1496 /* ---------------------------------------------------------------------------
1497  * deleteAllThreads():  kill all the live threads.
1498  *
1499  * This is used when we catch a user interrupt (^C), before performing
1500  * any necessary cleanups and running finalizers.
1501  *
1502  * Locks: sched_mutex held.
1503  * ------------------------------------------------------------------------- */
1504    
1505 void
1506 deleteAllThreads ( void )
1507 {
1508   StgTSO* t, *next;
1509   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1510   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1511       next = t->global_link;
1512       deleteThread(t);
1513   }      
1514
1515   // The run queue now contains a bunch of ThreadKilled threads.  We
1516   // must not throw these away: the main thread(s) will be in there
1517   // somewhere, and the main scheduler loop has to deal with it.
1518   // Also, the run queue is the only thing keeping these threads from
1519   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1520
1521   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1522   ASSERT(sleeping_queue == END_TSO_QUEUE);
1523 }
1524
1525 /* startThread and  insertThread are now in GranSim.c -- HWL */
1526
1527
1528 /* ---------------------------------------------------------------------------
1529  * Suspending & resuming Haskell threads.
1530  * 
1531  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1532  * its capability before calling the C function.  This allows another
1533  * task to pick up the capability and carry on running Haskell
1534  * threads.  It also means that if the C call blocks, it won't lock
1535  * the whole system.
1536  *
1537  * The Haskell thread making the C call is put to sleep for the
1538  * duration of the call, on the susepended_ccalling_threads queue.  We
1539  * give out a token to the task, which it can use to resume the thread
1540  * on return from the C function.
1541  * ------------------------------------------------------------------------- */
1542    
1543 StgInt
1544 suspendThread( StgRegTable *reg )
1545 {
1546   nat tok;
1547   Capability *cap;
1548   int saved_errno = errno;
1549
1550   /* assume that *reg is a pointer to the StgRegTable part
1551    * of a Capability.
1552    */
1553   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1554
1555   ACQUIRE_LOCK(&sched_mutex);
1556
1557   IF_DEBUG(scheduler,
1558            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1559
1560   // XXX this might not be necessary --SDM
1561   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1562
1563   threadPaused(cap->r.rCurrentTSO);
1564   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1565   suspended_ccalling_threads = cap->r.rCurrentTSO;
1566
1567   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1568       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1569       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1570   } else {
1571       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1572   }
1573
1574   /* Use the thread ID as the token; it should be unique */
1575   tok = cap->r.rCurrentTSO->id;
1576
1577   /* Hand back capability */
1578   releaseCapability(cap);
1579   
1580 #if defined(RTS_SUPPORTS_THREADS)
1581   /* Preparing to leave the RTS, so ensure there's a native thread/task
1582      waiting to take over.
1583   */
1584   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1585 #endif
1586
1587   RELEASE_LOCK(&sched_mutex);
1588   
1589   errno = saved_errno;
1590   return tok; 
1591 }
1592
1593 StgRegTable *
1594 resumeThread( StgInt tok )
1595 {
1596   StgTSO *tso, **prev;
1597   Capability *cap;
1598   int saved_errno = errno;
1599
1600 #if defined(RTS_SUPPORTS_THREADS)
1601   /* Wait for permission to re-enter the RTS with the result. */
1602   ACQUIRE_LOCK(&sched_mutex);
1603   waitForReturnCapability(&sched_mutex, &cap);
1604
1605   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1606 #else
1607   grabCapability(&cap);
1608 #endif
1609
1610   /* Remove the thread off of the suspended list */
1611   prev = &suspended_ccalling_threads;
1612   for (tso = suspended_ccalling_threads; 
1613        tso != END_TSO_QUEUE; 
1614        prev = &tso->link, tso = tso->link) {
1615     if (tso->id == (StgThreadID)tok) {
1616       *prev = tso->link;
1617       break;
1618     }
1619   }
1620   if (tso == END_TSO_QUEUE) {
1621     barf("resumeThread: thread not found");
1622   }
1623   tso->link = END_TSO_QUEUE;
1624   
1625   if(tso->why_blocked == BlockedOnCCall) {
1626       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1627       tso->blocked_exceptions = NULL;
1628   }
1629   
1630   /* Reset blocking status */
1631   tso->why_blocked  = NotBlocked;
1632
1633   cap->r.rCurrentTSO = tso;
1634   RELEASE_LOCK(&sched_mutex);
1635   errno = saved_errno;
1636   return &cap->r;
1637 }
1638
1639
1640 /* ---------------------------------------------------------------------------
1641  * Static functions
1642  * ------------------------------------------------------------------------ */
1643 static void unblockThread(StgTSO *tso);
1644
1645 /* ---------------------------------------------------------------------------
1646  * Comparing Thread ids.
1647  *
1648  * This is used from STG land in the implementation of the
1649  * instances of Eq/Ord for ThreadIds.
1650  * ------------------------------------------------------------------------ */
1651
1652 int
1653 cmp_thread(StgPtr tso1, StgPtr tso2) 
1654
1655   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1656   StgThreadID id2 = ((StgTSO *)tso2)->id;
1657  
1658   if (id1 < id2) return (-1);
1659   if (id1 > id2) return 1;
1660   return 0;
1661 }
1662
1663 /* ---------------------------------------------------------------------------
1664  * Fetching the ThreadID from an StgTSO.
1665  *
1666  * This is used in the implementation of Show for ThreadIds.
1667  * ------------------------------------------------------------------------ */
1668 int
1669 rts_getThreadId(StgPtr tso) 
1670 {
1671   return ((StgTSO *)tso)->id;
1672 }
1673
1674 #ifdef DEBUG
1675 void
1676 labelThread(StgPtr tso, char *label)
1677 {
1678   int len;
1679   void *buf;
1680
1681   /* Caveat: Once set, you can only set the thread name to "" */
1682   len = strlen(label)+1;
1683   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1684   strncpy(buf,label,len);
1685   /* Update will free the old memory for us */
1686   updateThreadLabel(((StgTSO *)tso)->id,buf);
1687 }
1688 #endif /* DEBUG */
1689
1690 /* ---------------------------------------------------------------------------
1691    Create a new thread.
1692
1693    The new thread starts with the given stack size.  Before the
1694    scheduler can run, however, this thread needs to have a closure
1695    (and possibly some arguments) pushed on its stack.  See
1696    pushClosure() in Schedule.h.
1697
1698    createGenThread() and createIOThread() (in SchedAPI.h) are
1699    convenient packaged versions of this function.
1700
1701    currently pri (priority) is only used in a GRAN setup -- HWL
1702    ------------------------------------------------------------------------ */
1703 #if defined(GRAN)
1704 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1705 StgTSO *
1706 createThread(nat size, StgInt pri)
1707 #else
1708 StgTSO *
1709 createThread(nat size)
1710 #endif
1711 {
1712
1713     StgTSO *tso;
1714     nat stack_size;
1715
1716     /* First check whether we should create a thread at all */
1717 #if defined(PAR)
1718   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1719   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1720     threadsIgnored++;
1721     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1722           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1723     return END_TSO_QUEUE;
1724   }
1725   threadsCreated++;
1726 #endif
1727
1728 #if defined(GRAN)
1729   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1730 #endif
1731
1732   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1733
1734   /* catch ridiculously small stack sizes */
1735   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1736     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1737   }
1738
1739   stack_size = size - TSO_STRUCT_SIZEW;
1740
1741   tso = (StgTSO *)allocate(size);
1742   TICK_ALLOC_TSO(stack_size, 0);
1743
1744   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1745 #if defined(GRAN)
1746   SET_GRAN_HDR(tso, ThisPE);
1747 #endif
1748
1749   // Always start with the compiled code evaluator
1750   tso->what_next = ThreadRunGHC;
1751
1752   tso->id = next_thread_id++; 
1753   tso->why_blocked  = NotBlocked;
1754   tso->blocked_exceptions = NULL;
1755
1756   tso->saved_errno = 0;
1757   tso->main = NULL;
1758   
1759   tso->stack_size   = stack_size;
1760   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1761                               - TSO_STRUCT_SIZEW;
1762   tso->sp           = (P_)&(tso->stack) + stack_size;
1763
1764   tso->trec = NO_TREC;
1765
1766 #ifdef PROFILING
1767   tso->prof.CCCS = CCS_MAIN;
1768 #endif
1769
1770   /* put a stop frame on the stack */
1771   tso->sp -= sizeofW(StgStopFrame);
1772   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1773   tso->link = END_TSO_QUEUE;
1774
1775   // ToDo: check this
1776 #if defined(GRAN)
1777   /* uses more flexible routine in GranSim */
1778   insertThread(tso, CurrentProc);
1779 #else
1780   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1781    * from its creation
1782    */
1783 #endif
1784
1785 #if defined(GRAN) 
1786   if (RtsFlags.GranFlags.GranSimStats.Full) 
1787     DumpGranEvent(GR_START,tso);
1788 #elif defined(PAR)
1789   if (RtsFlags.ParFlags.ParStats.Full) 
1790     DumpGranEvent(GR_STARTQ,tso);
1791   /* HACk to avoid SCHEDULE 
1792      LastTSO = tso; */
1793 #endif
1794
1795   /* Link the new thread on the global thread list.
1796    */
1797   tso->global_link = all_threads;
1798   all_threads = tso;
1799
1800 #if defined(DIST)
1801   tso->dist.priority = MandatoryPriority; //by default that is...
1802 #endif
1803
1804 #if defined(GRAN)
1805   tso->gran.pri = pri;
1806 # if defined(DEBUG)
1807   tso->gran.magic = TSO_MAGIC; // debugging only
1808 # endif
1809   tso->gran.sparkname   = 0;
1810   tso->gran.startedat   = CURRENT_TIME; 
1811   tso->gran.exported    = 0;
1812   tso->gran.basicblocks = 0;
1813   tso->gran.allocs      = 0;
1814   tso->gran.exectime    = 0;
1815   tso->gran.fetchtime   = 0;
1816   tso->gran.fetchcount  = 0;
1817   tso->gran.blocktime   = 0;
1818   tso->gran.blockcount  = 0;
1819   tso->gran.blockedat   = 0;
1820   tso->gran.globalsparks = 0;
1821   tso->gran.localsparks  = 0;
1822   if (RtsFlags.GranFlags.Light)
1823     tso->gran.clock  = Now; /* local clock */
1824   else
1825     tso->gran.clock  = 0;
1826
1827   IF_DEBUG(gran,printTSO(tso));
1828 #elif defined(PAR)
1829 # if defined(DEBUG)
1830   tso->par.magic = TSO_MAGIC; // debugging only
1831 # endif
1832   tso->par.sparkname   = 0;
1833   tso->par.startedat   = CURRENT_TIME; 
1834   tso->par.exported    = 0;
1835   tso->par.basicblocks = 0;
1836   tso->par.allocs      = 0;
1837   tso->par.exectime    = 0;
1838   tso->par.fetchtime   = 0;
1839   tso->par.fetchcount  = 0;
1840   tso->par.blocktime   = 0;
1841   tso->par.blockcount  = 0;
1842   tso->par.blockedat   = 0;
1843   tso->par.globalsparks = 0;
1844   tso->par.localsparks  = 0;
1845 #endif
1846
1847 #if defined(GRAN)
1848   globalGranStats.tot_threads_created++;
1849   globalGranStats.threads_created_on_PE[CurrentProc]++;
1850   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1851   globalGranStats.tot_sq_probes++;
1852 #elif defined(PAR)
1853   // collect parallel global statistics (currently done together with GC stats)
1854   if (RtsFlags.ParFlags.ParStats.Global &&
1855       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1856     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1857     globalParStats.tot_threads_created++;
1858   }
1859 #endif 
1860
1861 #if defined(GRAN)
1862   IF_GRAN_DEBUG(pri,
1863                 sched_belch("==__ schedule: Created TSO %d (%p);",
1864                       CurrentProc, tso, tso->id));
1865 #elif defined(PAR)
1866     IF_PAR_DEBUG(verbose,
1867                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1868                        (long)tso->id, tso, advisory_thread_count));
1869 #else
1870   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1871                                 (long)tso->id, (long)tso->stack_size));
1872 #endif    
1873   return tso;
1874 }
1875
1876 #if defined(PAR)
1877 /* RFP:
1878    all parallel thread creation calls should fall through the following routine.
1879 */
1880 StgTSO *
1881 createSparkThread(rtsSpark spark) 
1882 { StgTSO *tso;
1883   ASSERT(spark != (rtsSpark)NULL);
1884   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1885   { threadsIgnored++;
1886     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1887           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1888     return END_TSO_QUEUE;
1889   }
1890   else
1891   { threadsCreated++;
1892     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1893     if (tso==END_TSO_QUEUE)     
1894       barf("createSparkThread: Cannot create TSO");
1895 #if defined(DIST)
1896     tso->priority = AdvisoryPriority;
1897 #endif
1898     pushClosure(tso,spark);
1899     PUSH_ON_RUN_QUEUE(tso);
1900     advisory_thread_count++;    
1901   }
1902   return tso;
1903 }
1904 #endif
1905
1906 /*
1907   Turn a spark into a thread.
1908   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1909 */
1910 #if defined(PAR)
1911 StgTSO *
1912 activateSpark (rtsSpark spark) 
1913 {
1914   StgTSO *tso;
1915
1916   tso = createSparkThread(spark);
1917   if (RtsFlags.ParFlags.ParStats.Full) {   
1918     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1919     IF_PAR_DEBUG(verbose,
1920                  debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1921                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1922   }
1923   // ToDo: fwd info on local/global spark to thread -- HWL
1924   // tso->gran.exported =  spark->exported;
1925   // tso->gran.locked =   !spark->global;
1926   // tso->gran.sparkname = spark->name;
1927
1928   return tso;
1929 }
1930 #endif
1931
1932 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1933                                    Capability *initialCapability
1934                                    );
1935
1936
1937 /* ---------------------------------------------------------------------------
1938  * scheduleThread()
1939  *
1940  * scheduleThread puts a thread on the head of the runnable queue.
1941  * This will usually be done immediately after a thread is created.
1942  * The caller of scheduleThread must create the thread using e.g.
1943  * createThread and push an appropriate closure
1944  * on this thread's stack before the scheduler is invoked.
1945  * ------------------------------------------------------------------------ */
1946
1947 static void scheduleThread_ (StgTSO* tso);
1948
1949 void
1950 scheduleThread_(StgTSO *tso)
1951 {
1952   // The thread goes at the *end* of the run-queue, to avoid possible
1953   // starvation of any threads already on the queue.
1954   APPEND_TO_RUN_QUEUE(tso);
1955   threadRunnable();
1956 }
1957
1958 void
1959 scheduleThread(StgTSO* tso)
1960 {
1961   ACQUIRE_LOCK(&sched_mutex);
1962   scheduleThread_(tso);
1963   RELEASE_LOCK(&sched_mutex);
1964 }
1965
1966 #if defined(RTS_SUPPORTS_THREADS)
1967 static Condition bound_cond_cache;
1968 static int bound_cond_cache_full = 0;
1969 #endif
1970
1971
1972 SchedulerStatus
1973 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1974                    Capability *initialCapability)
1975 {
1976     // Precondition: sched_mutex must be held
1977     StgMainThread *m;
1978
1979     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1980     m->tso = tso;
1981     tso->main = m;
1982     m->ret = ret;
1983     m->stat = NoStatus;
1984     m->link = main_threads;
1985     m->prev = NULL;
1986     if (main_threads != NULL) {
1987         main_threads->prev = m;
1988     }
1989     main_threads = m;
1990
1991 #if defined(RTS_SUPPORTS_THREADS)
1992     // Allocating a new condition for each thread is expensive, so we
1993     // cache one.  This is a pretty feeble hack, but it helps speed up
1994     // consecutive call-ins quite a bit.
1995     if (bound_cond_cache_full) {
1996         m->bound_thread_cond = bound_cond_cache;
1997         bound_cond_cache_full = 0;
1998     } else {
1999         initCondition(&m->bound_thread_cond);
2000     }
2001 #endif
2002
2003     /* Put the thread on the main-threads list prior to scheduling the TSO.
2004        Failure to do so introduces a race condition in the MT case (as
2005        identified by Wolfgang Thaller), whereby the new task/OS thread 
2006        created by scheduleThread_() would complete prior to the thread
2007        that spawned it managed to put 'itself' on the main-threads list.
2008        The upshot of it all being that the worker thread wouldn't get to
2009        signal the completion of the its work item for the main thread to
2010        see (==> it got stuck waiting.)    -- sof 6/02.
2011     */
2012     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2013     
2014     APPEND_TO_RUN_QUEUE(tso);
2015     // NB. Don't call threadRunnable() here, because the thread is
2016     // bound and only runnable by *this* OS thread, so waking up other
2017     // workers will just slow things down.
2018
2019     return waitThread_(m, initialCapability);
2020 }
2021
2022 /* ---------------------------------------------------------------------------
2023  * initScheduler()
2024  *
2025  * Initialise the scheduler.  This resets all the queues - if the
2026  * queues contained any threads, they'll be garbage collected at the
2027  * next pass.
2028  *
2029  * ------------------------------------------------------------------------ */
2030
2031 void 
2032 initScheduler(void)
2033 {
2034 #if defined(GRAN)
2035   nat i;
2036
2037   for (i=0; i<=MAX_PROC; i++) {
2038     run_queue_hds[i]      = END_TSO_QUEUE;
2039     run_queue_tls[i]      = END_TSO_QUEUE;
2040     blocked_queue_hds[i]  = END_TSO_QUEUE;
2041     blocked_queue_tls[i]  = END_TSO_QUEUE;
2042     ccalling_threadss[i]  = END_TSO_QUEUE;
2043     sleeping_queue        = END_TSO_QUEUE;
2044   }
2045 #else
2046   run_queue_hd      = END_TSO_QUEUE;
2047   run_queue_tl      = END_TSO_QUEUE;
2048   blocked_queue_hd  = END_TSO_QUEUE;
2049   blocked_queue_tl  = END_TSO_QUEUE;
2050   sleeping_queue    = END_TSO_QUEUE;
2051 #endif 
2052
2053   suspended_ccalling_threads  = END_TSO_QUEUE;
2054
2055   main_threads = NULL;
2056   all_threads  = END_TSO_QUEUE;
2057
2058   context_switch = 0;
2059   interrupted    = 0;
2060
2061   RtsFlags.ConcFlags.ctxtSwitchTicks =
2062       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2063       
2064 #if defined(RTS_SUPPORTS_THREADS)
2065   /* Initialise the mutex and condition variables used by
2066    * the scheduler. */
2067   initMutex(&sched_mutex);
2068   initMutex(&term_mutex);
2069 #endif
2070   
2071   ACQUIRE_LOCK(&sched_mutex);
2072
2073   /* A capability holds the state a native thread needs in
2074    * order to execute STG code. At least one capability is
2075    * floating around (only SMP builds have more than one).
2076    */
2077   initCapabilities();
2078   
2079 #if defined(RTS_SUPPORTS_THREADS)
2080     /* start our haskell execution tasks */
2081     startTaskManager(0,taskStart);
2082 #endif
2083
2084 #if /* defined(SMP) ||*/ defined(PAR)
2085   initSparkPools();
2086 #endif
2087
2088   RELEASE_LOCK(&sched_mutex);
2089 }
2090
2091 void
2092 exitScheduler( void )
2093 {
2094 #if defined(RTS_SUPPORTS_THREADS)
2095   stopTaskManager();
2096 #endif
2097   shutting_down_scheduler = rtsTrue;
2098 }
2099
2100 /* ----------------------------------------------------------------------------
2101    Managing the per-task allocation areas.
2102    
2103    Each capability comes with an allocation area.  These are
2104    fixed-length block lists into which allocation can be done.
2105
2106    ToDo: no support for two-space collection at the moment???
2107    ------------------------------------------------------------------------- */
2108
2109 static
2110 SchedulerStatus
2111 waitThread_(StgMainThread* m, Capability *initialCapability)
2112 {
2113   SchedulerStatus stat;
2114
2115   // Precondition: sched_mutex must be held.
2116   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2117
2118 #if defined(GRAN)
2119   /* GranSim specific init */
2120   CurrentTSO = m->tso;                // the TSO to run
2121   procStatus[MainProc] = Busy;        // status of main PE
2122   CurrentProc = MainProc;             // PE to run it on
2123   schedule(m,initialCapability);
2124 #else
2125   schedule(m,initialCapability);
2126   ASSERT(m->stat != NoStatus);
2127 #endif
2128
2129   stat = m->stat;
2130
2131 #if defined(RTS_SUPPORTS_THREADS)
2132   // Free the condition variable, returning it to the cache if possible.
2133   if (!bound_cond_cache_full) {
2134       bound_cond_cache = m->bound_thread_cond;
2135       bound_cond_cache_full = 1;
2136   } else {
2137       closeCondition(&m->bound_thread_cond);
2138   }
2139 #endif
2140
2141   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2142   stgFree(m);
2143
2144   // Postcondition: sched_mutex still held
2145   return stat;
2146 }
2147
2148 /* ---------------------------------------------------------------------------
2149    Where are the roots that we know about?
2150
2151         - all the threads on the runnable queue
2152         - all the threads on the blocked queue
2153         - all the threads on the sleeping queue
2154         - all the thread currently executing a _ccall_GC
2155         - all the "main threads"
2156      
2157    ------------------------------------------------------------------------ */
2158
2159 /* This has to be protected either by the scheduler monitor, or by the
2160         garbage collection monitor (probably the latter).
2161         KH @ 25/10/99
2162 */
2163
2164 void
2165 GetRoots( evac_fn evac )
2166 {
2167 #if defined(GRAN)
2168   {
2169     nat i;
2170     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2171       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2172           evac((StgClosure **)&run_queue_hds[i]);
2173       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2174           evac((StgClosure **)&run_queue_tls[i]);
2175       
2176       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2177           evac((StgClosure **)&blocked_queue_hds[i]);
2178       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2179           evac((StgClosure **)&blocked_queue_tls[i]);
2180       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2181           evac((StgClosure **)&ccalling_threads[i]);
2182     }
2183   }
2184
2185   markEventQueue();
2186
2187 #else /* !GRAN */
2188   if (run_queue_hd != END_TSO_QUEUE) {
2189       ASSERT(run_queue_tl != END_TSO_QUEUE);
2190       evac((StgClosure **)&run_queue_hd);
2191       evac((StgClosure **)&run_queue_tl);
2192   }
2193   
2194   if (blocked_queue_hd != END_TSO_QUEUE) {
2195       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2196       evac((StgClosure **)&blocked_queue_hd);
2197       evac((StgClosure **)&blocked_queue_tl);
2198   }
2199   
2200   if (sleeping_queue != END_TSO_QUEUE) {
2201       evac((StgClosure **)&sleeping_queue);
2202   }
2203 #endif 
2204
2205   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2206       evac((StgClosure **)&suspended_ccalling_threads);
2207   }
2208
2209 #if defined(PAR) || defined(GRAN)
2210   markSparkQueue(evac);
2211 #endif
2212
2213 #if defined(RTS_USER_SIGNALS)
2214   // mark the signal handlers (signals should be already blocked)
2215   markSignalHandlers(evac);
2216 #endif
2217 }
2218
2219 /* -----------------------------------------------------------------------------
2220    performGC
2221
2222    This is the interface to the garbage collector from Haskell land.
2223    We provide this so that external C code can allocate and garbage
2224    collect when called from Haskell via _ccall_GC.
2225
2226    It might be useful to provide an interface whereby the programmer
2227    can specify more roots (ToDo).
2228    
2229    This needs to be protected by the GC condition variable above.  KH.
2230    -------------------------------------------------------------------------- */
2231
2232 static void (*extra_roots)(evac_fn);
2233
2234 void
2235 performGC(void)
2236 {
2237   /* Obligated to hold this lock upon entry */
2238   ACQUIRE_LOCK(&sched_mutex);
2239   GarbageCollect(GetRoots,rtsFalse);
2240   RELEASE_LOCK(&sched_mutex);
2241 }
2242
2243 void
2244 performMajorGC(void)
2245 {
2246   ACQUIRE_LOCK(&sched_mutex);
2247   GarbageCollect(GetRoots,rtsTrue);
2248   RELEASE_LOCK(&sched_mutex);
2249 }
2250
2251 static void
2252 AllRoots(evac_fn evac)
2253 {
2254     GetRoots(evac);             // the scheduler's roots
2255     extra_roots(evac);          // the user's roots
2256 }
2257
2258 void
2259 performGCWithRoots(void (*get_roots)(evac_fn))
2260 {
2261   ACQUIRE_LOCK(&sched_mutex);
2262   extra_roots = get_roots;
2263   GarbageCollect(AllRoots,rtsFalse);
2264   RELEASE_LOCK(&sched_mutex);
2265 }
2266
2267 /* -----------------------------------------------------------------------------
2268    Stack overflow
2269
2270    If the thread has reached its maximum stack size, then raise the
2271    StackOverflow exception in the offending thread.  Otherwise
2272    relocate the TSO into a larger chunk of memory and adjust its stack
2273    size appropriately.
2274    -------------------------------------------------------------------------- */
2275
2276 static StgTSO *
2277 threadStackOverflow(StgTSO *tso)
2278 {
2279   nat new_stack_size, new_tso_size, stack_words;
2280   StgPtr new_sp;
2281   StgTSO *dest;
2282
2283   IF_DEBUG(sanity,checkTSO(tso));
2284   if (tso->stack_size >= tso->max_stack_size) {
2285
2286     IF_DEBUG(gc,
2287              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2288                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2289              /* If we're debugging, just print out the top of the stack */
2290              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2291                                               tso->sp+64)));
2292
2293     /* Send this thread the StackOverflow exception */
2294     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2295     return tso;
2296   }
2297
2298   /* Try to double the current stack size.  If that takes us over the
2299    * maximum stack size for this thread, then use the maximum instead.
2300    * Finally round up so the TSO ends up as a whole number of blocks.
2301    */
2302   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2303   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2304                                        TSO_STRUCT_SIZE)/sizeof(W_);
2305   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2306   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2307
2308   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2309
2310   dest = (StgTSO *)allocate(new_tso_size);
2311   TICK_ALLOC_TSO(new_stack_size,0);
2312
2313   /* copy the TSO block and the old stack into the new area */
2314   memcpy(dest,tso,TSO_STRUCT_SIZE);
2315   stack_words = tso->stack + tso->stack_size - tso->sp;
2316   new_sp = (P_)dest + new_tso_size - stack_words;
2317   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2318
2319   /* relocate the stack pointers... */
2320   dest->sp         = new_sp;
2321   dest->stack_size = new_stack_size;
2322         
2323   /* Mark the old TSO as relocated.  We have to check for relocated
2324    * TSOs in the garbage collector and any primops that deal with TSOs.
2325    *
2326    * It's important to set the sp value to just beyond the end
2327    * of the stack, so we don't attempt to scavenge any part of the
2328    * dead TSO's stack.
2329    */
2330   tso->what_next = ThreadRelocated;
2331   tso->link = dest;
2332   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2333   tso->why_blocked = NotBlocked;
2334   dest->mut_link = NULL;
2335
2336   IF_PAR_DEBUG(verbose,
2337                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2338                      tso->id, tso, tso->stack_size);
2339                /* If we're debugging, just print out the top of the stack */
2340                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2341                                                 tso->sp+64)));
2342   
2343   IF_DEBUG(sanity,checkTSO(tso));
2344 #if 0
2345   IF_DEBUG(scheduler,printTSO(dest));
2346 #endif
2347
2348   return dest;
2349 }
2350
2351 /* ---------------------------------------------------------------------------
2352    Wake up a queue that was blocked on some resource.
2353    ------------------------------------------------------------------------ */
2354
2355 #if defined(GRAN)
2356 STATIC_INLINE void
2357 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2358 {
2359 }
2360 #elif defined(PAR)
2361 STATIC_INLINE void
2362 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2363 {
2364   /* write RESUME events to log file and
2365      update blocked and fetch time (depending on type of the orig closure) */
2366   if (RtsFlags.ParFlags.ParStats.Full) {
2367     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2368                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2369                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2370     if (EMPTY_RUN_QUEUE())
2371       emitSchedule = rtsTrue;
2372
2373     switch (get_itbl(node)->type) {
2374         case FETCH_ME_BQ:
2375           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2376           break;
2377         case RBH:
2378         case FETCH_ME:
2379         case BLACKHOLE_BQ:
2380           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2381           break;
2382 #ifdef DIST
2383         case MVAR:
2384           break;
2385 #endif    
2386         default:
2387           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2388         }
2389       }
2390 }
2391 #endif
2392
2393 #if defined(GRAN)
2394 static StgBlockingQueueElement *
2395 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2396 {
2397     StgTSO *tso;
2398     PEs node_loc, tso_loc;
2399
2400     node_loc = where_is(node); // should be lifted out of loop
2401     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2402     tso_loc = where_is((StgClosure *)tso);
2403     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2404       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2405       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2406       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2407       // insertThread(tso, node_loc);
2408       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2409                 ResumeThread,
2410                 tso, node, (rtsSpark*)NULL);
2411       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2412       // len_local++;
2413       // len++;
2414     } else { // TSO is remote (actually should be FMBQ)
2415       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2416                                   RtsFlags.GranFlags.Costs.gunblocktime +
2417                                   RtsFlags.GranFlags.Costs.latency;
2418       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2419                 UnblockThread,
2420                 tso, node, (rtsSpark*)NULL);
2421       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2422       // len++;
2423     }
2424     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2425     IF_GRAN_DEBUG(bq,
2426                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2427                           (node_loc==tso_loc ? "Local" : "Global"), 
2428                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2429     tso->block_info.closure = NULL;
2430     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2431                              tso->id, tso));
2432 }
2433 #elif defined(PAR)
2434 static StgBlockingQueueElement *
2435 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2436 {
2437     StgBlockingQueueElement *next;
2438
2439     switch (get_itbl(bqe)->type) {
2440     case TSO:
2441       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2442       /* if it's a TSO just push it onto the run_queue */
2443       next = bqe->link;
2444       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2445       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2446       threadRunnable();
2447       unblockCount(bqe, node);
2448       /* reset blocking status after dumping event */
2449       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2450       break;
2451
2452     case BLOCKED_FETCH:
2453       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2454       next = bqe->link;
2455       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2456       PendingFetches = (StgBlockedFetch *)bqe;
2457       break;
2458
2459 # if defined(DEBUG)
2460       /* can ignore this case in a non-debugging setup; 
2461          see comments on RBHSave closures above */
2462     case CONSTR:
2463       /* check that the closure is an RBHSave closure */
2464       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2465              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2466              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2467       break;
2468
2469     default:
2470       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2471            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2472            (StgClosure *)bqe);
2473 # endif
2474     }
2475   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2476   return next;
2477 }
2478
2479 #else /* !GRAN && !PAR */
2480 static StgTSO *
2481 unblockOneLocked(StgTSO *tso)
2482 {
2483   StgTSO *next;
2484
2485   ASSERT(get_itbl(tso)->type == TSO);
2486   ASSERT(tso->why_blocked != NotBlocked);
2487   tso->why_blocked = NotBlocked;
2488   next = tso->link;
2489   tso->link = END_TSO_QUEUE;
2490   APPEND_TO_RUN_QUEUE(tso);
2491   threadRunnable();
2492   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2493   return next;
2494 }
2495 #endif
2496
2497 #if defined(GRAN) || defined(PAR)
2498 INLINE_ME StgBlockingQueueElement *
2499 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2500 {
2501   ACQUIRE_LOCK(&sched_mutex);
2502   bqe = unblockOneLocked(bqe, node);
2503   RELEASE_LOCK(&sched_mutex);
2504   return bqe;
2505 }
2506 #else
2507 INLINE_ME StgTSO *
2508 unblockOne(StgTSO *tso)
2509 {
2510   ACQUIRE_LOCK(&sched_mutex);
2511   tso = unblockOneLocked(tso);
2512   RELEASE_LOCK(&sched_mutex);
2513   return tso;
2514 }
2515 #endif
2516
2517 #if defined(GRAN)
2518 void 
2519 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2520 {
2521   StgBlockingQueueElement *bqe;
2522   PEs node_loc;
2523   nat len = 0; 
2524
2525   IF_GRAN_DEBUG(bq, 
2526                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2527                       node, CurrentProc, CurrentTime[CurrentProc], 
2528                       CurrentTSO->id, CurrentTSO));
2529
2530   node_loc = where_is(node);
2531
2532   ASSERT(q == END_BQ_QUEUE ||
2533          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2534          get_itbl(q)->type == CONSTR); // closure (type constructor)
2535   ASSERT(is_unique(node));
2536
2537   /* FAKE FETCH: magically copy the node to the tso's proc;
2538      no Fetch necessary because in reality the node should not have been 
2539      moved to the other PE in the first place
2540   */
2541   if (CurrentProc!=node_loc) {
2542     IF_GRAN_DEBUG(bq, 
2543                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2544                         node, node_loc, CurrentProc, CurrentTSO->id, 
2545                         // CurrentTSO, where_is(CurrentTSO),
2546                         node->header.gran.procs));
2547     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2548     IF_GRAN_DEBUG(bq, 
2549                   debugBelch("## new bitmask of node %p is %#x\n",
2550                         node, node->header.gran.procs));
2551     if (RtsFlags.GranFlags.GranSimStats.Global) {
2552       globalGranStats.tot_fake_fetches++;
2553     }
2554   }
2555
2556   bqe = q;
2557   // ToDo: check: ASSERT(CurrentProc==node_loc);
2558   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2559     //next = bqe->link;
2560     /* 
2561        bqe points to the current element in the queue
2562        next points to the next element in the queue
2563     */
2564     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2565     //tso_loc = where_is(tso);
2566     len++;
2567     bqe = unblockOneLocked(bqe, node);
2568   }
2569
2570   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2571      the closure to make room for the anchor of the BQ */
2572   if (bqe!=END_BQ_QUEUE) {
2573     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2574     /*
2575     ASSERT((info_ptr==&RBH_Save_0_info) ||
2576            (info_ptr==&RBH_Save_1_info) ||
2577            (info_ptr==&RBH_Save_2_info));
2578     */
2579     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2580     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2581     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2582
2583     IF_GRAN_DEBUG(bq,
2584                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2585                         node, info_type(node)));
2586   }
2587
2588   /* statistics gathering */
2589   if (RtsFlags.GranFlags.GranSimStats.Global) {
2590     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2591     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2592     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2593     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2594   }
2595   IF_GRAN_DEBUG(bq,
2596                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2597                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2598 }
2599 #elif defined(PAR)
2600 void 
2601 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2602 {
2603   StgBlockingQueueElement *bqe;
2604
2605   ACQUIRE_LOCK(&sched_mutex);
2606
2607   IF_PAR_DEBUG(verbose, 
2608                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2609                      node, mytid));
2610 #ifdef DIST  
2611   //RFP
2612   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2613     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2614     return;
2615   }
2616 #endif
2617   
2618   ASSERT(q == END_BQ_QUEUE ||
2619          get_itbl(q)->type == TSO ||           
2620          get_itbl(q)->type == BLOCKED_FETCH || 
2621          get_itbl(q)->type == CONSTR); 
2622
2623   bqe = q;
2624   while (get_itbl(bqe)->type==TSO || 
2625          get_itbl(bqe)->type==BLOCKED_FETCH) {
2626     bqe = unblockOneLocked(bqe, node);
2627   }
2628   RELEASE_LOCK(&sched_mutex);
2629 }
2630
2631 #else   /* !GRAN && !PAR */
2632
2633 void
2634 awakenBlockedQueueNoLock(StgTSO *tso)
2635 {
2636   while (tso != END_TSO_QUEUE) {
2637     tso = unblockOneLocked(tso);
2638   }
2639 }
2640
2641 void
2642 awakenBlockedQueue(StgTSO *tso)
2643 {
2644   ACQUIRE_LOCK(&sched_mutex);
2645   while (tso != END_TSO_QUEUE) {
2646     tso = unblockOneLocked(tso);
2647   }
2648   RELEASE_LOCK(&sched_mutex);
2649 }
2650 #endif
2651
2652 /* ---------------------------------------------------------------------------
2653    Interrupt execution
2654    - usually called inside a signal handler so it mustn't do anything fancy.   
2655    ------------------------------------------------------------------------ */
2656
2657 void
2658 interruptStgRts(void)
2659 {
2660     interrupted    = 1;
2661     context_switch = 1;
2662 }
2663
2664 /* -----------------------------------------------------------------------------
2665    Unblock a thread
2666
2667    This is for use when we raise an exception in another thread, which
2668    may be blocked.
2669    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2670    -------------------------------------------------------------------------- */
2671
2672 #if defined(GRAN) || defined(PAR)
2673 /*
2674   NB: only the type of the blocking queue is different in GranSim and GUM
2675       the operations on the queue-elements are the same
2676       long live polymorphism!
2677
2678   Locks: sched_mutex is held upon entry and exit.
2679
2680 */
2681 static void
2682 unblockThread(StgTSO *tso)
2683 {
2684   StgBlockingQueueElement *t, **last;
2685
2686   switch (tso->why_blocked) {
2687
2688   case NotBlocked:
2689     return;  /* not blocked */
2690
2691   case BlockedOnSTM:
2692     // Be careful: nothing to do here!  We tell the scheduler that the thread
2693     // is runnable and we leave it to the stack-walking code to abort the 
2694     // transaction while unwinding the stack.  We should perhaps have a debugging
2695     // test to make sure that this really happens and that the 'zombie' transaction
2696     // does not get committed.
2697     goto done;
2698
2699   case BlockedOnMVar:
2700     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2701     {
2702       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2703       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2704
2705       last = (StgBlockingQueueElement **)&mvar->head;
2706       for (t = (StgBlockingQueueElement *)mvar->head; 
2707            t != END_BQ_QUEUE; 
2708            last = &t->link, last_tso = t, t = t->link) {
2709         if (t == (StgBlockingQueueElement *)tso) {
2710           *last = (StgBlockingQueueElement *)tso->link;
2711           if (mvar->tail == tso) {
2712             mvar->tail = (StgTSO *)last_tso;
2713           }
2714           goto done;
2715         }
2716       }
2717       barf("unblockThread (MVAR): TSO not found");
2718     }
2719
2720   case BlockedOnBlackHole:
2721     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2722     {
2723       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2724
2725       last = &bq->blocking_queue;
2726       for (t = bq->blocking_queue; 
2727            t != END_BQ_QUEUE; 
2728            last = &t->link, t = t->link) {
2729         if (t == (StgBlockingQueueElement *)tso) {
2730           *last = (StgBlockingQueueElement *)tso->link;
2731           goto done;
2732         }
2733       }
2734       barf("unblockThread (BLACKHOLE): TSO not found");
2735     }
2736
2737   case BlockedOnException:
2738     {
2739       StgTSO *target  = tso->block_info.tso;
2740
2741       ASSERT(get_itbl(target)->type == TSO);
2742
2743       if (target->what_next == ThreadRelocated) {
2744           target = target->link;
2745           ASSERT(get_itbl(target)->type == TSO);
2746       }
2747
2748       ASSERT(target->blocked_exceptions != NULL);
2749
2750       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2751       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2752            t != END_BQ_QUEUE; 
2753            last = &t->link, t = t->link) {
2754         ASSERT(get_itbl(t)->type == TSO);
2755         if (t == (StgBlockingQueueElement *)tso) {
2756           *last = (StgBlockingQueueElement *)tso->link;
2757           goto done;
2758         }
2759       }
2760       barf("unblockThread (Exception): TSO not found");
2761     }
2762
2763   case BlockedOnRead:
2764   case BlockedOnWrite:
2765 #if defined(mingw32_HOST_OS)
2766   case BlockedOnDoProc:
2767 #endif
2768     {
2769       /* take TSO off blocked_queue */
2770       StgBlockingQueueElement *prev = NULL;
2771       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2772            prev = t, t = t->link) {
2773         if (t == (StgBlockingQueueElement *)tso) {
2774           if (prev == NULL) {
2775             blocked_queue_hd = (StgTSO *)t->link;
2776             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2777               blocked_queue_tl = END_TSO_QUEUE;
2778             }
2779           } else {
2780             prev->link = t->link;
2781             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2782               blocked_queue_tl = (StgTSO *)prev;
2783             }
2784           }
2785           goto done;
2786         }
2787       }
2788       barf("unblockThread (I/O): TSO not found");
2789     }
2790
2791   case BlockedOnDelay:
2792     {
2793       /* take TSO off sleeping_queue */
2794       StgBlockingQueueElement *prev = NULL;
2795       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2796            prev = t, t = t->link) {
2797         if (t == (StgBlockingQueueElement *)tso) {
2798           if (prev == NULL) {
2799             sleeping_queue = (StgTSO *)t->link;
2800           } else {
2801             prev->link = t->link;
2802           }
2803           goto done;
2804         }
2805       }
2806       barf("unblockThread (delay): TSO not found");
2807     }
2808
2809   default:
2810     barf("unblockThread");
2811   }
2812
2813  done:
2814   tso->link = END_TSO_QUEUE;
2815   tso->why_blocked = NotBlocked;
2816   tso->block_info.closure = NULL;
2817   PUSH_ON_RUN_QUEUE(tso);
2818 }
2819 #else
2820 static void
2821 unblockThread(StgTSO *tso)
2822 {
2823   StgTSO *t, **last;
2824   
2825   /* To avoid locking unnecessarily. */
2826   if (tso->why_blocked == NotBlocked) {
2827     return;
2828   }
2829
2830   switch (tso->why_blocked) {
2831
2832   case BlockedOnSTM:
2833     // Be careful: nothing to do here!  We tell the scheduler that the thread
2834     // is runnable and we leave it to the stack-walking code to abort the 
2835     // transaction while unwinding the stack.  We should perhaps have a debugging
2836     // test to make sure that this really happens and that the 'zombie' transaction
2837     // does not get committed.
2838     goto done;
2839
2840   case BlockedOnMVar:
2841     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2842     {
2843       StgTSO *last_tso = END_TSO_QUEUE;
2844       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2845
2846       last = &mvar->head;
2847       for (t = mvar->head; t != END_TSO_QUEUE; 
2848            last = &t->link, last_tso = t, t = t->link) {
2849         if (t == tso) {
2850           *last = tso->link;
2851           if (mvar->tail == tso) {
2852             mvar->tail = last_tso;
2853           }
2854           goto done;
2855         }
2856       }
2857       barf("unblockThread (MVAR): TSO not found");
2858     }
2859
2860   case BlockedOnBlackHole:
2861     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2862     {
2863       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2864
2865       last = &bq->blocking_queue;
2866       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2867            last = &t->link, t = t->link) {
2868         if (t == tso) {
2869           *last = tso->link;
2870           goto done;
2871         }
2872       }
2873       barf("unblockThread (BLACKHOLE): TSO not found");
2874     }
2875
2876   case BlockedOnException:
2877     {
2878       StgTSO *target  = tso->block_info.tso;
2879
2880       ASSERT(get_itbl(target)->type == TSO);
2881
2882       while (target->what_next == ThreadRelocated) {
2883           target = target->link;
2884           ASSERT(get_itbl(target)->type == TSO);
2885       }
2886       
2887       ASSERT(target->blocked_exceptions != NULL);
2888
2889       last = &target->blocked_exceptions;
2890       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2891            last = &t->link, t = t->link) {
2892         ASSERT(get_itbl(t)->type == TSO);
2893         if (t == tso) {
2894           *last = tso->link;
2895           goto done;
2896         }
2897       }
2898       barf("unblockThread (Exception): TSO not found");
2899     }
2900
2901   case BlockedOnRead:
2902   case BlockedOnWrite:
2903 #if defined(mingw32_HOST_OS)
2904   case BlockedOnDoProc:
2905 #endif
2906     {
2907       StgTSO *prev = NULL;
2908       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2909            prev = t, t = t->link) {
2910         if (t == tso) {
2911           if (prev == NULL) {
2912             blocked_queue_hd = t->link;
2913             if (blocked_queue_tl == t) {
2914               blocked_queue_tl = END_TSO_QUEUE;
2915             }
2916           } else {
2917             prev->link = t->link;
2918             if (blocked_queue_tl == t) {
2919               blocked_queue_tl = prev;
2920             }
2921           }
2922           goto done;
2923         }
2924       }
2925       barf("unblockThread (I/O): TSO not found");
2926     }
2927
2928   case BlockedOnDelay:
2929     {
2930       StgTSO *prev = NULL;
2931       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2932            prev = t, t = t->link) {
2933         if (t == tso) {
2934           if (prev == NULL) {
2935             sleeping_queue = t->link;
2936           } else {
2937             prev->link = t->link;
2938           }
2939           goto done;
2940         }
2941       }
2942       barf("unblockThread (delay): TSO not found");
2943     }
2944
2945   default:
2946     barf("unblockThread");
2947   }
2948
2949  done:
2950   tso->link = END_TSO_QUEUE;
2951   tso->why_blocked = NotBlocked;
2952   tso->block_info.closure = NULL;
2953   APPEND_TO_RUN_QUEUE(tso);
2954 }
2955 #endif
2956
2957 /* -----------------------------------------------------------------------------
2958  * raiseAsync()
2959  *
2960  * The following function implements the magic for raising an
2961  * asynchronous exception in an existing thread.
2962  *
2963  * We first remove the thread from any queue on which it might be
2964  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2965  *
2966  * We strip the stack down to the innermost CATCH_FRAME, building
2967  * thunks in the heap for all the active computations, so they can 
2968  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2969  * an application of the handler to the exception, and push it on
2970  * the top of the stack.
2971  * 
2972  * How exactly do we save all the active computations?  We create an
2973  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2974  * AP_STACKs pushes everything from the corresponding update frame
2975  * upwards onto the stack.  (Actually, it pushes everything up to the
2976  * next update frame plus a pointer to the next AP_STACK object.
2977  * Entering the next AP_STACK object pushes more onto the stack until we
2978  * reach the last AP_STACK object - at which point the stack should look
2979  * exactly as it did when we killed the TSO and we can continue
2980  * execution by entering the closure on top of the stack.
2981  *
2982  * We can also kill a thread entirely - this happens if either (a) the 
2983  * exception passed to raiseAsync is NULL, or (b) there's no
2984  * CATCH_FRAME on the stack.  In either case, we strip the entire
2985  * stack and replace the thread with a zombie.
2986  *
2987  * Locks: sched_mutex held upon entry nor exit.
2988  *
2989  * -------------------------------------------------------------------------- */
2990  
2991 void 
2992 deleteThread(StgTSO *tso)
2993 {
2994   if (tso->why_blocked != BlockedOnCCall &&
2995       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2996       raiseAsync(tso,NULL);
2997   }
2998 }
2999
3000 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3001 static void 
3002 deleteThreadImmediately(StgTSO *tso)
3003 { // for forkProcess only:
3004   // delete thread without giving it a chance to catch the KillThread exception
3005
3006   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3007       return;
3008   }
3009
3010   if (tso->why_blocked != BlockedOnCCall &&
3011       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3012     unblockThread(tso);
3013   }
3014
3015   tso->what_next = ThreadKilled;
3016 }
3017 #endif
3018
3019 void
3020 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3021 {
3022   /* When raising async exs from contexts where sched_mutex isn't held;
3023      use raiseAsyncWithLock(). */
3024   ACQUIRE_LOCK(&sched_mutex);
3025   raiseAsync(tso,exception);
3026   RELEASE_LOCK(&sched_mutex);
3027 }
3028
3029 void
3030 raiseAsync(StgTSO *tso, StgClosure *exception)
3031 {
3032     raiseAsync_(tso, exception, rtsFalse);
3033 }
3034
3035 static void
3036 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3037 {
3038     StgRetInfoTable *info;
3039     StgPtr sp;
3040   
3041     // Thread already dead?
3042     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3043         return;
3044     }
3045
3046     IF_DEBUG(scheduler, 
3047              sched_belch("raising exception in thread %ld.", (long)tso->id));
3048     
3049     // Remove it from any blocking queues
3050     unblockThread(tso);
3051
3052     sp = tso->sp;
3053     
3054     // The stack freezing code assumes there's a closure pointer on
3055     // the top of the stack, so we have to arrange that this is the case...
3056     //
3057     if (sp[0] == (W_)&stg_enter_info) {
3058         sp++;
3059     } else {
3060         sp--;
3061         sp[0] = (W_)&stg_dummy_ret_closure;
3062     }
3063
3064     while (1) {
3065         nat i;
3066
3067         // 1. Let the top of the stack be the "current closure"
3068         //
3069         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3070         // CATCH_FRAME.
3071         //
3072         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3073         // current closure applied to the chunk of stack up to (but not
3074         // including) the update frame.  This closure becomes the "current
3075         // closure".  Go back to step 2.
3076         //
3077         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3078         // top of the stack applied to the exception.
3079         // 
3080         // 5. If it's a STOP_FRAME, then kill the thread.
3081         // 
3082         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3083         // transaction
3084        
3085         
3086         StgPtr frame;
3087         
3088         frame = sp + 1;
3089         info = get_ret_itbl((StgClosure *)frame);
3090         
3091         while (info->i.type != UPDATE_FRAME
3092                && (info->i.type != CATCH_FRAME || exception == NULL)
3093                && info->i.type != STOP_FRAME
3094                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3095         {
3096             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3097               // IF we find an ATOMICALLY_FRAME then we abort the
3098               // current transaction and propagate the exception.  In
3099               // this case (unlike ordinary exceptions) we do not care
3100               // whether the transaction is valid or not because its
3101               // possible validity cannot have caused the exception
3102               // and will not be visible after the abort.
3103               IF_DEBUG(stm,
3104                        debugBelch("Found atomically block delivering async exception\n"));
3105               stmAbortTransaction(tso -> trec);
3106               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3107             }
3108             frame += stack_frame_sizeW((StgClosure *)frame);
3109             info = get_ret_itbl((StgClosure *)frame);
3110         }
3111         
3112         switch (info->i.type) {
3113             
3114         case ATOMICALLY_FRAME:
3115             ASSERT(stop_at_atomically);
3116             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3117             stmCondemnTransaction(tso -> trec);
3118 #ifdef REG_R1
3119             tso->sp = frame;
3120 #else
3121             // R1 is not a register: the return convention for IO in
3122             // this case puts the return value on the stack, so we
3123             // need to set up the stack to return to the atomically
3124             // frame properly...
3125             tso->sp = frame - 2;
3126             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3127             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3128 #endif
3129             tso->what_next = ThreadRunGHC;
3130             return;
3131
3132         case CATCH_FRAME:
3133             // If we find a CATCH_FRAME, and we've got an exception to raise,
3134             // then build the THUNK raise(exception), and leave it on
3135             // top of the CATCH_FRAME ready to enter.
3136             //
3137         {
3138 #ifdef PROFILING
3139             StgCatchFrame *cf = (StgCatchFrame *)frame;
3140 #endif
3141             StgClosure *raise;
3142             
3143             // we've got an exception to raise, so let's pass it to the
3144             // handler in this frame.
3145             //
3146             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3147             TICK_ALLOC_SE_THK(1,0);
3148             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3149             raise->payload[0] = exception;
3150             
3151             // throw away the stack from Sp up to the CATCH_FRAME.
3152             //
3153             sp = frame - 1;
3154             
3155             /* Ensure that async excpetions are blocked now, so we don't get
3156              * a surprise exception before we get around to executing the
3157              * handler.
3158              */
3159             if (tso->blocked_exceptions == NULL) {
3160                 tso->blocked_exceptions = END_TSO_QUEUE;
3161             }
3162             
3163             /* Put the newly-built THUNK on top of the stack, ready to execute
3164              * when the thread restarts.
3165              */
3166             sp[0] = (W_)raise;
3167             sp[-1] = (W_)&stg_enter_info;
3168             tso->sp = sp-1;
3169             tso->what_next = ThreadRunGHC;
3170             IF_DEBUG(sanity, checkTSO(tso));
3171             return;
3172         }
3173         
3174         case UPDATE_FRAME:
3175         {
3176             StgAP_STACK * ap;
3177             nat words;
3178             
3179             // First build an AP_STACK consisting of the stack chunk above the
3180             // current update frame, with the top word on the stack as the
3181             // fun field.
3182             //
3183             words = frame - sp - 1;
3184             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3185             
3186             ap->size = words;
3187             ap->fun  = (StgClosure *)sp[0];
3188             sp++;
3189             for(i=0; i < (nat)words; ++i) {
3190                 ap->payload[i] = (StgClosure *)*sp++;
3191             }
3192             
3193             SET_HDR(ap,&stg_AP_STACK_info,
3194                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3195             TICK_ALLOC_UP_THK(words+1,0);
3196             
3197             IF_DEBUG(scheduler,
3198                      debugBelch("sched: Updating ");
3199                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3200                      debugBelch(" with ");
3201                      printObj((StgClosure *)ap);
3202                 );
3203
3204             // Replace the updatee with an indirection - happily
3205             // this will also wake up any threads currently
3206             // waiting on the result.
3207             //
3208             // Warning: if we're in a loop, more than one update frame on
3209             // the stack may point to the same object.  Be careful not to
3210             // overwrite an IND_OLDGEN in this case, because we'll screw
3211             // up the mutable lists.  To be on the safe side, don't
3212             // overwrite any kind of indirection at all.  See also
3213             // threadSqueezeStack in GC.c, where we have to make a similar
3214             // check.
3215             //
3216             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3217                 // revert the black hole
3218                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3219                                (StgClosure *)ap);
3220             }
3221             sp += sizeofW(StgUpdateFrame) - 1;
3222             sp[0] = (W_)ap; // push onto stack
3223             break;
3224         }
3225         
3226         case STOP_FRAME:
3227             // We've stripped the entire stack, the thread is now dead.
3228             sp += sizeofW(StgStopFrame);
3229             tso->what_next = ThreadKilled;
3230             tso->sp = sp;
3231             return;
3232             
3233         default:
3234             barf("raiseAsync");
3235         }
3236     }
3237     barf("raiseAsync");
3238 }
3239
3240 /* -----------------------------------------------------------------------------
3241    raiseExceptionHelper
3242    
3243    This function is called by the raise# primitve, just so that we can
3244    move some of the tricky bits of raising an exception from C-- into
3245    C.  Who knows, it might be a useful re-useable thing here too.
3246    -------------------------------------------------------------------------- */
3247
3248 StgWord
3249 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3250 {
3251     StgClosure *raise_closure = NULL;
3252     StgPtr p, next;
3253     StgRetInfoTable *info;
3254     //
3255     // This closure represents the expression 'raise# E' where E
3256     // is the exception raise.  It is used to overwrite all the
3257     // thunks which are currently under evaluataion.
3258     //
3259
3260     //    
3261     // LDV profiling: stg_raise_info has THUNK as its closure
3262     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3263     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3264     // 1 does not cause any problem unless profiling is performed.
3265     // However, when LDV profiling goes on, we need to linearly scan
3266     // small object pool, where raise_closure is stored, so we should
3267     // use MIN_UPD_SIZE.
3268     //
3269     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3270     //                                 sizeofW(StgClosure)+1);
3271     //
3272
3273     //
3274     // Walk up the stack, looking for the catch frame.  On the way,
3275     // we update any closures pointed to from update frames with the
3276     // raise closure that we just built.
3277     //
3278     p = tso->sp;
3279     while(1) {
3280         info = get_ret_itbl((StgClosure *)p);
3281         next = p + stack_frame_sizeW((StgClosure *)p);
3282         switch (info->i.type) {
3283             
3284         case UPDATE_FRAME:
3285             // Only create raise_closure if we need to.
3286             if (raise_closure == NULL) {
3287                 raise_closure = 
3288                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3289                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3290                 raise_closure->payload[0] = exception;
3291             }
3292             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3293             p = next;
3294             continue;
3295
3296         case ATOMICALLY_FRAME:
3297             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3298             tso->sp = p;
3299             return ATOMICALLY_FRAME;
3300             
3301         case CATCH_FRAME:
3302             tso->sp = p;
3303             return CATCH_FRAME;
3304
3305         case CATCH_STM_FRAME:
3306             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3307             tso->sp = p;
3308             return CATCH_STM_FRAME;
3309             
3310         case STOP_FRAME:
3311             tso->sp = p;
3312             return STOP_FRAME;
3313
3314         case CATCH_RETRY_FRAME:
3315         default:
3316             p = next; 
3317             continue;
3318         }
3319     }
3320 }
3321
3322
3323 /* -----------------------------------------------------------------------------
3324    findRetryFrameHelper
3325
3326    This function is called by the retry# primitive.  It traverses the stack
3327    leaving tso->sp referring to the frame which should handle the retry.  
3328
3329    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3330    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3331
3332    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3333    despite the similar implementation.
3334
3335    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3336    not be created within memory transactions.
3337    -------------------------------------------------------------------------- */
3338
3339 StgWord
3340 findRetryFrameHelper (StgTSO *tso)
3341 {
3342   StgPtr           p, next;
3343   StgRetInfoTable *info;
3344
3345   p = tso -> sp;
3346   while (1) {
3347     info = get_ret_itbl((StgClosure *)p);
3348     next = p + stack_frame_sizeW((StgClosure *)p);
3349     switch (info->i.type) {
3350       
3351     case ATOMICALLY_FRAME:
3352       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3353       tso->sp = p;
3354       return ATOMICALLY_FRAME;
3355       
3356     case CATCH_RETRY_FRAME:
3357       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3358       tso->sp = p;
3359       return CATCH_RETRY_FRAME;
3360       
3361     case CATCH_STM_FRAME:
3362     default:
3363       ASSERT(info->i.type != CATCH_FRAME);
3364       ASSERT(info->i.type != STOP_FRAME);
3365       p = next; 
3366       continue;
3367     }
3368   }
3369 }
3370
3371 /* -----------------------------------------------------------------------------
3372    resurrectThreads is called after garbage collection on the list of
3373    threads found to be garbage.  Each of these threads will be woken
3374    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3375    on an MVar, or NonTermination if the thread was blocked on a Black
3376    Hole.
3377
3378    Locks: sched_mutex isn't held upon entry nor exit.
3379    -------------------------------------------------------------------------- */
3380
3381 void
3382 resurrectThreads( StgTSO *threads )
3383 {
3384   StgTSO *tso, *next;
3385
3386   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3387     next = tso->global_link;
3388     tso->global_link = all_threads;
3389     all_threads = tso;
3390     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3391
3392     switch (tso->why_blocked) {
3393     case BlockedOnMVar:
3394     case BlockedOnException:
3395       /* Called by GC - sched_mutex lock is currently held. */
3396       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3397       break;
3398     case BlockedOnBlackHole:
3399       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3400       break;
3401     case BlockedOnSTM:
3402       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3403       break;
3404     case NotBlocked:
3405       /* This might happen if the thread was blocked on a black hole
3406        * belonging to a thread that we've just woken up (raiseAsync
3407        * can wake up threads, remember...).
3408        */
3409       continue;
3410     default:
3411       barf("resurrectThreads: thread blocked in a strange way");
3412     }
3413   }
3414 }
3415
3416 /* ----------------------------------------------------------------------------
3417  * Debugging: why is a thread blocked
3418  * [Also provides useful information when debugging threaded programs
3419  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3420    ------------------------------------------------------------------------- */
3421
3422 static
3423 void
3424 printThreadBlockage(StgTSO *tso)
3425 {
3426   switch (tso->why_blocked) {
3427   case BlockedOnRead:
3428     debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3429     break;
3430   case BlockedOnWrite:
3431     debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3432     break;
3433 #if defined(mingw32_HOST_OS)
3434     case BlockedOnDoProc:
3435     debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3436     break;
3437 #endif
3438   case BlockedOnDelay:
3439     debugBelch("is blocked until %d", tso->block_info.target);
3440     break;
3441   case BlockedOnMVar:
3442     debugBelch("is blocked on an MVar");
3443     break;
3444   case BlockedOnException:
3445     debugBelch("is blocked on delivering an exception to thread %d",
3446             tso->block_info.tso->id);
3447     break;
3448   case BlockedOnBlackHole:
3449     debugBelch("is blocked on a black hole");
3450     break;
3451   case NotBlocked:
3452     debugBelch("is not blocked");
3453     break;
3454 #if defined(PAR)
3455   case BlockedOnGA:
3456     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3457             tso->block_info.closure, info_type(tso->block_info.closure));
3458     break;
3459   case BlockedOnGA_NoSend:
3460     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3461             tso->block_info.closure, info_type(tso->block_info.closure));
3462     break;
3463 #endif
3464   case BlockedOnCCall:
3465     debugBelch("is blocked on an external call");
3466     break;
3467   case BlockedOnCCall_NoUnblockExc:
3468     debugBelch("is blocked on an external call (exceptions were already blocked)");
3469     break;
3470   case BlockedOnSTM:
3471     debugBelch("is blocked on an STM operation");
3472     break;
3473   default:
3474     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3475          tso->why_blocked, tso->id, tso);
3476   }
3477 }
3478
3479 static
3480 void
3481 printThreadStatus(StgTSO *tso)
3482 {
3483   switch (tso->what_next) {
3484   case ThreadKilled:
3485     debugBelch("has been killed");
3486     break;
3487   case ThreadComplete:
3488     debugBelch("has completed");
3489     break;
3490   default:
3491     printThreadBlockage(tso);
3492   }
3493 }
3494
3495 void
3496 printAllThreads(void)
3497 {
3498   StgTSO *t;
3499
3500 # if defined(GRAN)
3501   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3502   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3503                        time_string, rtsFalse/*no commas!*/);
3504
3505   debugBelch("all threads at [%s]:\n", time_string);
3506 # elif defined(PAR)
3507   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3508   ullong_format_string(CURRENT_TIME,
3509                        time_string, rtsFalse/*no commas!*/);
3510
3511   debugBelch("all threads at [%s]:\n", time_string);
3512 # else
3513   debugBelch("all threads:\n");
3514 # endif
3515
3516   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3517     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3518 #if defined(DEBUG)
3519     {
3520       void *label = lookupThreadLabel(t->id);
3521       if (label) debugBelch("[\"%s\"] ",(char *)label);
3522     }
3523 #endif
3524     printThreadStatus(t);
3525     debugBelch("\n");
3526   }
3527 }
3528     
3529 #ifdef DEBUG
3530
3531 /* 
3532    Print a whole blocking queue attached to node (debugging only).
3533 */
3534 # if defined(PAR)
3535 void 
3536 print_bq (StgClosure *node)
3537 {
3538   StgBlockingQueueElement *bqe;
3539   StgTSO *tso;
3540   rtsBool end;
3541
3542   debugBelch("## BQ of closure %p (%s): ",
3543           node, info_type(node));
3544
3545   /* should cover all closures that may have a blocking queue */
3546   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3547          get_itbl(node)->type == FETCH_ME_BQ ||
3548          get_itbl(node)->type == RBH ||
3549          get_itbl(node)->type == MVAR);
3550     
3551   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3552
3553   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3554 }
3555
3556 /* 
3557    Print a whole blocking queue starting with the element bqe.
3558 */
3559 void 
3560 print_bqe (StgBlockingQueueElement *bqe)
3561 {
3562   rtsBool end;
3563
3564   /* 
3565      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3566   */
3567   for (end = (bqe==END_BQ_QUEUE);
3568        !end; // iterate until bqe points to a CONSTR
3569        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3570        bqe = end ? END_BQ_QUEUE : bqe->link) {
3571     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3572     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3573     /* types of closures that may appear in a blocking queue */
3574     ASSERT(get_itbl(bqe)->type == TSO ||           
3575            get_itbl(bqe)->type == BLOCKED_FETCH || 
3576            get_itbl(bqe)->type == CONSTR); 
3577     /* only BQs of an RBH end with an RBH_Save closure */
3578     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3579
3580     switch (get_itbl(bqe)->type) {
3581     case TSO:
3582       debugBelch(" TSO %u (%x),",
3583               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3584       break;
3585     case BLOCKED_FETCH:
3586       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3587               ((StgBlockedFetch *)bqe)->node, 
3588               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3589               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3590               ((StgBlockedFetch *)bqe)->ga.weight);
3591       break;
3592     case CONSTR:
3593       debugBelch(" %s (IP %p),",
3594               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3595                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3596                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3597                "RBH_Save_?"), get_itbl(bqe));
3598       break;
3599     default:
3600       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3601            info_type((StgClosure *)bqe)); // , node, info_type(node));
3602       break;
3603     }
3604   } /* for */
3605   debugBelch("\n");
3606 }
3607 # elif defined(GRAN)
3608 void 
3609 print_bq (StgClosure *node)
3610 {
3611   StgBlockingQueueElement *bqe;
3612   PEs node_loc, tso_loc;
3613   rtsBool end;
3614
3615   /* should cover all closures that may have a blocking queue */
3616   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3617          get_itbl(node)->type == FETCH_ME_BQ ||
3618          get_itbl(node)->type == RBH);
3619     
3620   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3621   node_loc = where_is(node);
3622
3623   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3624           node, info_type(node), node_loc);
3625
3626   /* 
3627      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3628   */
3629   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3630        !end; // iterate until bqe points to a CONSTR
3631        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3632     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3633     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3634     /* types of closures that may appear in a blocking queue */
3635     ASSERT(get_itbl(bqe)->type == TSO ||           
3636            get_itbl(bqe)->type == CONSTR); 
3637     /* only BQs of an RBH end with an RBH_Save closure */
3638     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3639
3640     tso_loc = where_is((StgClosure *)bqe);
3641     switch (get_itbl(bqe)->type) {
3642     case TSO:
3643       debugBelch(" TSO %d (%p) on [PE %d],",
3644               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3645       break;
3646     case CONSTR:
3647       debugBelch(" %s (IP %p),",
3648               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3649                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3650                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3651                "RBH_Save_?"), get_itbl(bqe));
3652       break;
3653     default:
3654       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3655            info_type((StgClosure *)bqe), node, info_type(node));
3656       break;
3657     }
3658   } /* for */
3659   debugBelch("\n");
3660 }
3661 #else
3662 /* 
3663    Nice and easy: only TSOs on the blocking queue
3664 */
3665 void 
3666 print_bq (StgClosure *node)
3667 {
3668   StgTSO *tso;
3669
3670   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3671   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3672        tso != END_TSO_QUEUE; 
3673        tso=tso->link) {
3674     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3675     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3676     debugBelch(" TSO %d (%p),", tso->id, tso);
3677   }
3678   debugBelch("\n");
3679 }
3680 # endif
3681
3682 #if defined(PAR)
3683 static nat
3684 run_queue_len(void)
3685 {
3686   nat i;
3687   StgTSO *tso;
3688
3689   for (i=0, tso=run_queue_hd; 
3690        tso != END_TSO_QUEUE;
3691        i++, tso=tso->link)
3692     /* nothing */
3693
3694   return i;
3695 }
3696 #endif
3697
3698 void
3699 sched_belch(char *s, ...)
3700 {
3701   va_list ap;
3702   va_start(ap,s);
3703 #ifdef RTS_SUPPORTS_THREADS
3704   debugBelch("sched (task %p): ", osThreadId());
3705 #elif defined(PAR)
3706   debugBelch("== ");
3707 #else
3708   debugBelch("sched: ");
3709 #endif
3710   vdebugBelch(s, ap);
3711   debugBelch("\n");
3712   va_end(ap);
3713 }
3714
3715 #endif /* DEBUG */