[project @ 2005-01-18 16:28:43 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "Hooks.h"
48 #define COMPILING_SCHEDULER
49 #include "Schedule.h"
50 #include "StgMiscClosures.h"
51 #include "Storage.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PAR)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include "OSThreads.h"
80 #include  "Task.h"
81
82 #ifdef HAVE_SYS_TYPES_H
83 #include <sys/types.h>
84 #endif
85 #ifdef HAVE_UNISTD_H
86 #include <unistd.h>
87 #endif
88
89 #include <string.h>
90 #include <stdlib.h>
91 #include <stdarg.h>
92
93 #ifdef HAVE_ERRNO_H
94 #include <errno.h>
95 #endif
96
97 #ifdef THREADED_RTS
98 #define USED_IN_THREADED_RTS
99 #else
100 #define USED_IN_THREADED_RTS STG_UNUSED
101 #endif
102
103 #ifdef RTS_SUPPORTS_THREADS
104 #define USED_WHEN_RTS_SUPPORTS_THREADS
105 #else
106 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
107 #endif
108
109 /* Main thread queue.
110  * Locks required: sched_mutex.
111  */
112 StgMainThread *main_threads = NULL;
113
114 /* Thread queues.
115  * Locks required: sched_mutex.
116  */
117 #if defined(GRAN)
118
119 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
120 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
121
122 /* 
123    In GranSim we have a runnable and a blocked queue for each processor.
124    In order to minimise code changes new arrays run_queue_hds/tls
125    are created. run_queue_hd is then a short cut (macro) for
126    run_queue_hds[CurrentProc] (see GranSim.h).
127    -- HWL
128 */
129 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
130 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
131 StgTSO *ccalling_threadss[MAX_PROC];
132 /* We use the same global list of threads (all_threads) in GranSim as in
133    the std RTS (i.e. we are cheating). However, we don't use this list in
134    the GranSim specific code at the moment (so we are only potentially
135    cheating).  */
136
137 #else /* !GRAN */
138
139 StgTSO *run_queue_hd = NULL;
140 StgTSO *run_queue_tl = NULL;
141 StgTSO *blocked_queue_hd = NULL;
142 StgTSO *blocked_queue_tl = NULL;
143 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
144
145 #endif
146
147 /* Linked list of all threads.
148  * Used for detecting garbage collected threads.
149  */
150 StgTSO *all_threads = NULL;
151
152 /* When a thread performs a safe C call (_ccall_GC, using old
153  * terminology), it gets put on the suspended_ccalling_threads
154  * list. Used by the garbage collector.
155  */
156 static StgTSO *suspended_ccalling_threads;
157
158 static StgTSO *threadStackOverflow(StgTSO *tso);
159
160 /* KH: The following two flags are shared memory locations.  There is no need
161        to lock them, since they are only unset at the end of a scheduler
162        operation.
163 */
164
165 /* flag set by signal handler to precipitate a context switch */
166 int context_switch = 0;
167
168 /* if this flag is set as well, give up execution */
169 rtsBool interrupted = rtsFalse;
170
171 /* Next thread ID to allocate.
172  * Locks required: thread_id_mutex
173  */
174 static StgThreadID next_thread_id = 1;
175
176 /*
177  * Pointers to the state of the current thread.
178  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
179  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
180  */
181  
182 /* The smallest stack size that makes any sense is:
183  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
184  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
185  *  + 1                       (the closure to enter)
186  *  + 1                       (stg_ap_v_ret)
187  *  + 1                       (spare slot req'd by stg_ap_v_ret)
188  *
189  * A thread with this stack will bomb immediately with a stack
190  * overflow, which will increase its stack size.  
191  */
192
193 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
194
195
196 #if defined(GRAN)
197 StgTSO *CurrentTSO;
198 #endif
199
200 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
201  *  exists - earlier gccs apparently didn't.
202  *  -= chak
203  */
204 StgTSO dummy_tso;
205
206 static rtsBool ready_to_gc;
207
208 /*
209  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
210  * in an MT setting, needed to signal that a worker thread shouldn't hang around
211  * in the scheduler when it is out of work.
212  */
213 static rtsBool shutting_down_scheduler = rtsFalse;
214
215 void            addToBlockedQueue ( StgTSO *tso );
216
217 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
218        void     interruptStgRts   ( void );
219
220 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
221 static void     detectBlackHoles  ( void );
222 #endif
223
224 static void     raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);
225
226 #if defined(RTS_SUPPORTS_THREADS)
227 /* ToDo: carefully document the invariants that go together
228  *       with these synchronisation objects.
229  */
230 Mutex     sched_mutex       = INIT_MUTEX_VAR;
231 Mutex     term_mutex        = INIT_MUTEX_VAR;
232
233 #endif /* RTS_SUPPORTS_THREADS */
234
235 #if defined(PAR)
236 StgTSO *LastTSO;
237 rtsTime TimeOfLastYield;
238 rtsBool emitSchedule = rtsTrue;
239 #endif
240
241 #if DEBUG
242 static char *whatNext_strs[] = {
243   "(unknown)",
244   "ThreadRunGHC",
245   "ThreadInterpret",
246   "ThreadKilled",
247   "ThreadRelocated",
248   "ThreadComplete"
249 };
250 #endif
251
252 #if defined(PAR)
253 StgTSO * createSparkThread(rtsSpark spark);
254 StgTSO * activateSpark (rtsSpark spark);  
255 #endif
256
257 /* ----------------------------------------------------------------------------
258  * Starting Tasks
259  * ------------------------------------------------------------------------- */
260
261 #if defined(RTS_SUPPORTS_THREADS)
262 static rtsBool startingWorkerThread = rtsFalse;
263
264 static void taskStart(void);
265 static void
266 taskStart(void)
267 {
268   ACQUIRE_LOCK(&sched_mutex);
269   startingWorkerThread = rtsFalse;
270   schedule(NULL,NULL);
271   RELEASE_LOCK(&sched_mutex);
272 }
273
274 void
275 startSchedulerTaskIfNecessary(void)
276 {
277   if(run_queue_hd != END_TSO_QUEUE
278     || blocked_queue_hd != END_TSO_QUEUE
279     || sleeping_queue != END_TSO_QUEUE)
280   {
281     if(!startingWorkerThread)
282     { // we don't want to start another worker thread
283       // just because the last one hasn't yet reached the
284       // "waiting for capability" state
285       startingWorkerThread = rtsTrue;
286       if(!startTask(taskStart))
287       {
288         startingWorkerThread = rtsFalse;
289       }
290     }
291   }
292 }
293 #endif
294
295 /* ---------------------------------------------------------------------------
296    Main scheduling loop.
297
298    We use round-robin scheduling, each thread returning to the
299    scheduler loop when one of these conditions is detected:
300
301       * out of heap space
302       * timer expires (thread yields)
303       * thread blocks
304       * thread ends
305       * stack overflow
306
307    Locking notes:  we acquire the scheduler lock once at the beginning
308    of the scheduler loop, and release it when
309     
310       * running a thread, or
311       * waiting for work, or
312       * waiting for a GC to complete.
313
314    GRAN version:
315      In a GranSim setup this loop iterates over the global event queue.
316      This revolves around the global event queue, which determines what 
317      to do next. Therefore, it's more complicated than either the 
318      concurrent or the parallel (GUM) setup.
319
320    GUM version:
321      GUM iterates over incoming messages.
322      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
323      and sends out a fish whenever it has nothing to do; in-between
324      doing the actual reductions (shared code below) it processes the
325      incoming messages and deals with delayed operations 
326      (see PendingFetches).
327      This is not the ugliest code you could imagine, but it's bloody close.
328
329    ------------------------------------------------------------------------ */
330 static void
331 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
332           Capability *initialCapability )
333 {
334   StgTSO *t;
335   Capability *cap;
336   StgThreadReturnCode ret;
337 #if defined(GRAN)
338   rtsEvent *event;
339 #elif defined(PAR)
340   StgSparkPool *pool;
341   rtsSpark spark;
342   StgTSO *tso;
343   GlobalTaskId pe;
344   rtsBool receivedFinish = rtsFalse;
345 # if defined(DEBUG)
346   nat tp_size, sp_size; // stats only
347 # endif
348 #endif
349   rtsBool was_interrupted = rtsFalse;
350   nat prev_what_next;
351   
352   // Pre-condition: sched_mutex is held.
353   // We might have a capability, passed in as initialCapability.
354   cap = initialCapability;
355
356 #if defined(RTS_SUPPORTS_THREADS)
357   //
358   // in the threaded case, the capability is either passed in via the
359   // initialCapability parameter, or initialized inside the scheduler
360   // loop 
361   //
362   IF_DEBUG(scheduler,
363            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
364                        mainThread, initialCapability);
365       );
366 #else
367   // simply initialise it in the non-threaded case
368   grabCapability(&cap);
369 #endif
370
371 #if defined(GRAN)
372   /* set up first event to get things going */
373   /* ToDo: assign costs for system setup and init MainTSO ! */
374   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
375             ContinueThread, 
376             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
377
378   IF_DEBUG(gran,
379            debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
380            G_TSO(CurrentTSO, 5));
381
382   if (RtsFlags.GranFlags.Light) {
383     /* Save current time; GranSim Light only */
384     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
385   }      
386
387   event = get_next_event();
388
389   while (event!=(rtsEvent*)NULL) {
390     /* Choose the processor with the next event */
391     CurrentProc = event->proc;
392     CurrentTSO = event->tso;
393
394 #elif defined(PAR)
395
396   while (!receivedFinish) {    /* set by processMessages */
397                                /* when receiving PP_FINISH message         */ 
398
399 #else // everything except GRAN and PAR
400
401   while (1) {
402
403 #endif
404
405      IF_DEBUG(scheduler, printAllThreads());
406
407 #if defined(RTS_SUPPORTS_THREADS)
408       // Yield the capability to higher-priority tasks if necessary.
409       //
410       if (cap != NULL) {
411           yieldCapability(&cap);
412       }
413
414       // If we do not currently hold a capability, we wait for one
415       //
416       if (cap == NULL) {
417           waitForCapability(&sched_mutex, &cap,
418                             mainThread ? &mainThread->bound_thread_cond : NULL);
419       }
420
421       // We now have a capability...
422 #endif
423
424     //
425     // If we're interrupted (the user pressed ^C, or some other
426     // termination condition occurred), kill all the currently running
427     // threads.
428     //
429     if (interrupted) {
430         IF_DEBUG(scheduler, sched_belch("interrupted"));
431         interrupted = rtsFalse;
432         was_interrupted = rtsTrue;
433 #if defined(RTS_SUPPORTS_THREADS)
434         // In the threaded RTS, deadlock detection doesn't work,
435         // so just exit right away.
436         errorBelch("interrupted");
437         releaseCapability(cap);
438         RELEASE_LOCK(&sched_mutex);
439         shutdownHaskellAndExit(EXIT_SUCCESS);
440 #else
441         deleteAllThreads();
442 #endif
443     }
444
445 #if defined(RTS_USER_SIGNALS)
446     // check for signals each time around the scheduler
447     if (signals_pending()) {
448       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
449       startSignalHandlers();
450       ACQUIRE_LOCK(&sched_mutex);
451     }
452 #endif
453
454     //
455     // Check whether any waiting threads need to be woken up.  If the
456     // run queue is empty, and there are no other tasks running, we
457     // can wait indefinitely for something to happen.
458     //
459     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
460     {
461 #if defined(RTS_SUPPORTS_THREADS)
462         // We shouldn't be here...
463         barf("schedule: awaitEvent() in threaded RTS");
464 #endif
465         awaitEvent( EMPTY_RUN_QUEUE() );
466     }
467     // we can be interrupted while waiting for I/O...
468     if (interrupted) continue;
469
470     /* 
471      * Detect deadlock: when we have no threads to run, there are no
472      * threads waiting on I/O or sleeping, and all the other tasks are
473      * waiting for work, we must have a deadlock of some description.
474      *
475      * We first try to find threads blocked on themselves (ie. black
476      * holes), and generate NonTermination exceptions where necessary.
477      *
478      * If no threads are black holed, we have a deadlock situation, so
479      * inform all the main threads.
480      */
481 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
482     if (   EMPTY_THREAD_QUEUES() )
483     {
484         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
485
486         // Garbage collection can release some new threads due to
487         // either (a) finalizers or (b) threads resurrected because
488         // they are unreachable and will therefore be sent an
489         // exception.  Any threads thus released will be immediately
490         // runnable.
491         GarbageCollect(GetRoots,rtsTrue);
492         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
493
494 #if defined(RTS_USER_SIGNALS)
495         /* If we have user-installed signal handlers, then wait
496          * for signals to arrive rather then bombing out with a
497          * deadlock.
498          */
499         if ( anyUserHandlers() ) {
500             IF_DEBUG(scheduler, 
501                      sched_belch("still deadlocked, waiting for signals..."));
502
503             awaitUserSignals();
504
505             // we might be interrupted...
506             if (interrupted) { continue; }
507
508             if (signals_pending()) {
509                 RELEASE_LOCK(&sched_mutex);
510                 startSignalHandlers();
511                 ACQUIRE_LOCK(&sched_mutex);
512             }
513             ASSERT(!EMPTY_RUN_QUEUE());
514             goto not_deadlocked;
515         }
516 #endif
517
518         /* Probably a real deadlock.  Send the current main thread the
519          * Deadlock exception (or in the SMP build, send *all* main
520          * threads the deadlock exception, since none of them can make
521          * progress).
522          */
523         {
524             StgMainThread *m;
525             m = main_threads;
526             switch (m->tso->why_blocked) {
527             case BlockedOnBlackHole:
528             case BlockedOnException:
529             case BlockedOnMVar:
530                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
531                 break;
532             default:
533                 barf("deadlock: main thread blocked in a strange way");
534             }
535         }
536     }
537   not_deadlocked:
538
539 #elif defined(RTS_SUPPORTS_THREADS)
540     // ToDo: add deadlock detection in threaded RTS
541 #elif defined(PAR)
542     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
543 #endif
544
545 #if defined(RTS_SUPPORTS_THREADS)
546     if ( EMPTY_RUN_QUEUE() ) {
547         continue; // nothing to do
548     }
549 #endif
550
551 #if defined(GRAN)
552     if (RtsFlags.GranFlags.Light)
553       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
554
555     /* adjust time based on time-stamp */
556     if (event->time > CurrentTime[CurrentProc] &&
557         event->evttype != ContinueThread)
558       CurrentTime[CurrentProc] = event->time;
559     
560     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
561     if (!RtsFlags.GranFlags.Light)
562       handleIdlePEs();
563
564     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
565
566     /* main event dispatcher in GranSim */
567     switch (event->evttype) {
568       /* Should just be continuing execution */
569     case ContinueThread:
570       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
571       /* ToDo: check assertion
572       ASSERT(run_queue_hd != (StgTSO*)NULL &&
573              run_queue_hd != END_TSO_QUEUE);
574       */
575       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
576       if (!RtsFlags.GranFlags.DoAsyncFetch &&
577           procStatus[CurrentProc]==Fetching) {
578         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
579               CurrentTSO->id, CurrentTSO, CurrentProc);
580         goto next_thread;
581       } 
582       /* Ignore ContinueThreads for completed threads */
583       if (CurrentTSO->what_next == ThreadComplete) {
584         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
585               CurrentTSO->id, CurrentTSO, CurrentProc);
586         goto next_thread;
587       } 
588       /* Ignore ContinueThreads for threads that are being migrated */
589       if (PROCS(CurrentTSO)==Nowhere) { 
590         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
591               CurrentTSO->id, CurrentTSO, CurrentProc);
592         goto next_thread;
593       }
594       /* The thread should be at the beginning of the run queue */
595       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
596         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
597               CurrentTSO->id, CurrentTSO, CurrentProc);
598         break; // run the thread anyway
599       }
600       /*
601       new_event(proc, proc, CurrentTime[proc],
602                 FindWork,
603                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
604       goto next_thread; 
605       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
606       break; // now actually run the thread; DaH Qu'vam yImuHbej 
607
608     case FetchNode:
609       do_the_fetchnode(event);
610       goto next_thread;             /* handle next event in event queue  */
611       
612     case GlobalBlock:
613       do_the_globalblock(event);
614       goto next_thread;             /* handle next event in event queue  */
615       
616     case FetchReply:
617       do_the_fetchreply(event);
618       goto next_thread;             /* handle next event in event queue  */
619       
620     case UnblockThread:   /* Move from the blocked queue to the tail of */
621       do_the_unblock(event);
622       goto next_thread;             /* handle next event in event queue  */
623       
624     case ResumeThread:  /* Move from the blocked queue to the tail of */
625       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
626       event->tso->gran.blocktime += 
627         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
628       do_the_startthread(event);
629       goto next_thread;             /* handle next event in event queue  */
630       
631     case StartThread:
632       do_the_startthread(event);
633       goto next_thread;             /* handle next event in event queue  */
634       
635     case MoveThread:
636       do_the_movethread(event);
637       goto next_thread;             /* handle next event in event queue  */
638       
639     case MoveSpark:
640       do_the_movespark(event);
641       goto next_thread;             /* handle next event in event queue  */
642       
643     case FindWork:
644       do_the_findwork(event);
645       goto next_thread;             /* handle next event in event queue  */
646       
647     default:
648       barf("Illegal event type %u\n", event->evttype);
649     }  /* switch */
650     
651     /* This point was scheduler_loop in the old RTS */
652
653     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
654
655     TimeOfLastEvent = CurrentTime[CurrentProc];
656     TimeOfNextEvent = get_time_of_next_event();
657     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
658     // CurrentTSO = ThreadQueueHd;
659
660     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
661                          TimeOfNextEvent));
662
663     if (RtsFlags.GranFlags.Light) 
664       GranSimLight_leave_system(event, &ActiveTSO); 
665
666     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
667
668     IF_DEBUG(gran, 
669              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
670
671     /* in a GranSim setup the TSO stays on the run queue */
672     t = CurrentTSO;
673     /* Take a thread from the run queue. */
674     POP_RUN_QUEUE(t); // take_off_run_queue(t);
675
676     IF_DEBUG(gran, 
677              debugBelch("GRAN: About to run current thread, which is\n");
678              G_TSO(t,5));
679
680     context_switch = 0; // turned on via GranYield, checking events and time slice
681
682     IF_DEBUG(gran, 
683              DumpGranEvent(GR_SCHEDULE, t));
684
685     procStatus[CurrentProc] = Busy;
686
687 #elif defined(PAR)
688     if (PendingFetches != END_BF_QUEUE) {
689         processFetches();
690     }
691
692     /* ToDo: phps merge with spark activation above */
693     /* check whether we have local work and send requests if we have none */
694     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
695       /* :-[  no local threads => look out for local sparks */
696       /* the spark pool for the current PE */
697       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
698       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
699           pool->hd < pool->tl) {
700         /* 
701          * ToDo: add GC code check that we really have enough heap afterwards!!
702          * Old comment:
703          * If we're here (no runnable threads) and we have pending
704          * sparks, we must have a space problem.  Get enough space
705          * to turn one of those pending sparks into a
706          * thread... 
707          */
708
709         spark = findSpark(rtsFalse);                /* get a spark */
710         if (spark != (rtsSpark) NULL) {
711           tso = activateSpark(spark);       /* turn the spark into a thread */
712           IF_PAR_DEBUG(schedule,
713                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
714                              tso->id, tso, advisory_thread_count));
715
716           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
717             debugBelch("==^^ failed to activate spark\n");
718             goto next_thread;
719           }               /* otherwise fall through & pick-up new tso */
720         } else {
721           IF_PAR_DEBUG(verbose,
722                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
723                              spark_queue_len(pool)));
724           goto next_thread;
725         }
726       }
727
728       /* If we still have no work we need to send a FISH to get a spark
729          from another PE 
730       */
731       if (EMPTY_RUN_QUEUE()) {
732       /* =8-[  no local sparks => look for work on other PEs */
733         /*
734          * We really have absolutely no work.  Send out a fish
735          * (there may be some out there already), and wait for
736          * something to arrive.  We clearly can't run any threads
737          * until a SCHEDULE or RESUME arrives, and so that's what
738          * we're hoping to see.  (Of course, we still have to
739          * respond to other types of messages.)
740          */
741         TIME now = msTime() /*CURRENT_TIME*/;
742         IF_PAR_DEBUG(verbose, 
743                      debugBelch("--  now=%ld\n", now));
744         IF_PAR_DEBUG(verbose,
745                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
746                          (last_fish_arrived_at!=0 &&
747                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
748                        debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
749                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
750                              last_fish_arrived_at,
751                              RtsFlags.ParFlags.fishDelay, now);
752                      });
753         
754         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
755             (last_fish_arrived_at==0 ||
756              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
757           /* outstandingFishes is set in sendFish, processFish;
758              avoid flooding system with fishes via delay */
759           pe = choosePE();
760           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
761                    NEW_FISH_HUNGER);
762
763           // Global statistics: count no. of fishes
764           if (RtsFlags.ParFlags.ParStats.Global &&
765               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
766             globalParStats.tot_fish_mess++;
767           }
768         }
769       
770         receivedFinish = processMessages();
771         goto next_thread;
772       }
773     } else if (PacketsWaiting()) {  /* Look for incoming messages */
774       receivedFinish = processMessages();
775     }
776
777     /* Now we are sure that we have some work available */
778     ASSERT(run_queue_hd != END_TSO_QUEUE);
779
780     /* Take a thread from the run queue, if we have work */
781     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
782     IF_DEBUG(sanity,checkTSO(t));
783
784     /* ToDo: write something to the log-file
785     if (RTSflags.ParFlags.granSimStats && !sameThread)
786         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
787
788     CurrentTSO = t;
789     */
790     /* the spark pool for the current PE */
791     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
792
793     IF_DEBUG(scheduler, 
794              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
795                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
796
797 # if 1
798     if (0 && RtsFlags.ParFlags.ParStats.Full && 
799         t && LastTSO && t->id != LastTSO->id && 
800         LastTSO->why_blocked == NotBlocked && 
801         LastTSO->what_next != ThreadComplete) {
802       // if previously scheduled TSO not blocked we have to record the context switch
803       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
804                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
805     }
806
807     if (RtsFlags.ParFlags.ParStats.Full && 
808         (emitSchedule /* forced emit */ ||
809         (t && LastTSO && t->id != LastTSO->id))) {
810       /* 
811          we are running a different TSO, so write a schedule event to log file
812          NB: If we use fair scheduling we also have to write  a deschedule 
813              event for LastTSO; with unfair scheduling we know that the
814              previous tso has blocked whenever we switch to another tso, so
815              we don't need it in GUM for now
816       */
817       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
818                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
819       emitSchedule = rtsFalse;
820     }
821      
822 # endif
823 #else /* !GRAN && !PAR */
824   
825     // grab a thread from the run queue
826     ASSERT(run_queue_hd != END_TSO_QUEUE);
827     POP_RUN_QUEUE(t);
828
829     // Sanity check the thread we're about to run.  This can be
830     // expensive if there is lots of thread switching going on...
831     IF_DEBUG(sanity,checkTSO(t));
832 #endif
833
834 #ifdef THREADED_RTS
835     {
836       StgMainThread *m = t->main;
837       
838       if(m)
839       {
840         if(m == mainThread)
841         {
842           IF_DEBUG(scheduler,
843             sched_belch("### Running thread %d in bound thread", t->id));
844           // yes, the Haskell thread is bound to the current native thread
845         }
846         else
847         {
848           IF_DEBUG(scheduler,
849             sched_belch("### thread %d bound to another OS thread", t->id));
850           // no, bound to a different Haskell thread: pass to that thread
851           PUSH_ON_RUN_QUEUE(t);
852           passCapability(&m->bound_thread_cond);
853           continue;
854         }
855       }
856       else
857       {
858         if(mainThread != NULL)
859         // The thread we want to run is bound.
860         {
861           IF_DEBUG(scheduler,
862             sched_belch("### this OS thread cannot run thread %d", t->id));
863           // no, the current native thread is bound to a different
864           // Haskell thread, so pass it to any worker thread
865           PUSH_ON_RUN_QUEUE(t);
866           passCapabilityToWorker();
867           continue; 
868         }
869       }
870     }
871 #endif
872
873     cap->r.rCurrentTSO = t;
874     
875     /* context switches are now initiated by the timer signal, unless
876      * the user specified "context switch as often as possible", with
877      * +RTS -C0
878      */
879     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
880          && (run_queue_hd != END_TSO_QUEUE
881              || blocked_queue_hd != END_TSO_QUEUE
882              || sleeping_queue != END_TSO_QUEUE)))
883         context_switch = 1;
884
885 run_thread:
886
887     RELEASE_LOCK(&sched_mutex);
888
889     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
890                               (long)t->id, whatNext_strs[t->what_next]));
891
892 #ifdef PROFILING
893     startHeapProfTimer();
894 #endif
895
896     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
897     /* Run the current thread 
898      */
899     prev_what_next = t->what_next;
900
901     errno = t->saved_errno;
902
903     switch (prev_what_next) {
904
905     case ThreadKilled:
906     case ThreadComplete:
907         /* Thread already finished, return to scheduler. */
908         ret = ThreadFinished;
909         break;
910
911     case ThreadRunGHC:
912         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
913         break;
914
915     case ThreadInterpret:
916         ret = interpretBCO(cap);
917         break;
918
919     default:
920       barf("schedule: invalid what_next field");
921     }
922
923     // The TSO might have moved, so find the new location:
924     t = cap->r.rCurrentTSO;
925
926     // And save the current errno in this thread.
927     t->saved_errno = errno;
928
929     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
930     
931     /* Costs for the scheduler are assigned to CCS_SYSTEM */
932 #ifdef PROFILING
933     stopHeapProfTimer();
934     CCCS = CCS_SYSTEM;
935 #endif
936     
937     ACQUIRE_LOCK(&sched_mutex);
938     
939 #ifdef RTS_SUPPORTS_THREADS
940     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
941 #elif !defined(GRAN) && !defined(PAR)
942     IF_DEBUG(scheduler,debugBelch("sched: "););
943 #endif
944     
945 #if defined(PAR)
946     /* HACK 675: if the last thread didn't yield, make sure to print a 
947        SCHEDULE event to the log file when StgRunning the next thread, even
948        if it is the same one as before */
949     LastTSO = t; 
950     TimeOfLastYield = CURRENT_TIME;
951 #endif
952
953     switch (ret) {
954     case HeapOverflow:
955 #if defined(GRAN)
956       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
957       globalGranStats.tot_heapover++;
958 #elif defined(PAR)
959       globalParStats.tot_heapover++;
960 #endif
961
962       // did the task ask for a large block?
963       if (cap->r.rHpAlloc > BLOCK_SIZE) {
964           // if so, get one and push it on the front of the nursery.
965           bdescr *bd;
966           nat blocks;
967           
968           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
969
970           IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", 
971                                    (long)t->id, whatNext_strs[t->what_next], blocks));
972
973           // don't do this if it would push us over the
974           // alloc_blocks_lim limit; we'll GC first.
975           if (alloc_blocks + blocks < alloc_blocks_lim) {
976
977               alloc_blocks += blocks;
978               bd = allocGroup( blocks );
979
980               // link the new group into the list
981               bd->link = cap->r.rCurrentNursery;
982               bd->u.back = cap->r.rCurrentNursery->u.back;
983               if (cap->r.rCurrentNursery->u.back != NULL) {
984                   cap->r.rCurrentNursery->u.back->link = bd;
985               } else {
986                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
987                          g0s0->blocks == cap->r.rNursery);
988                   cap->r.rNursery = g0s0->blocks = bd;
989               }           
990               cap->r.rCurrentNursery->u.back = bd;
991
992               // initialise it as a nursery block.  We initialise the
993               // step, gen_no, and flags field of *every* sub-block in
994               // this large block, because this is easier than making
995               // sure that we always find the block head of a large
996               // block whenever we call Bdescr() (eg. evacuate() and
997               // isAlive() in the GC would both have to do this, at
998               // least).
999               { 
1000                   bdescr *x;
1001                   for (x = bd; x < bd + blocks; x++) {
1002                       x->step = g0s0;
1003                       x->gen_no = 0;
1004                       x->flags = 0;
1005                   }
1006               }
1007
1008               // don't forget to update the block count in g0s0.
1009               g0s0->n_blocks += blocks;
1010               // This assert can be a killer if the app is doing lots
1011               // of large block allocations.
1012               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1013
1014               // now update the nursery to point to the new block
1015               cap->r.rCurrentNursery = bd;
1016
1017               // we might be unlucky and have another thread get on the
1018               // run queue before us and steal the large block, but in that
1019               // case the thread will just end up requesting another large
1020               // block.
1021               PUSH_ON_RUN_QUEUE(t);
1022               break;
1023           }
1024       }
1025
1026       /* make all the running tasks block on a condition variable,
1027        * maybe set context_switch and wait till they all pile in,
1028        * then have them wait on a GC condition variable.
1029        */
1030       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1031                                (long)t->id, whatNext_strs[t->what_next]));
1032       threadPaused(t);
1033 #if defined(GRAN)
1034       ASSERT(!is_on_queue(t,CurrentProc));
1035 #elif defined(PAR)
1036       /* Currently we emit a DESCHEDULE event before GC in GUM.
1037          ToDo: either add separate event to distinguish SYSTEM time from rest
1038                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1039       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1040         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1041                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1042         emitSchedule = rtsTrue;
1043       }
1044 #endif
1045       
1046       ready_to_gc = rtsTrue;
1047       context_switch = 1;               /* stop other threads ASAP */
1048       PUSH_ON_RUN_QUEUE(t);
1049       /* actual GC is done at the end of the while loop */
1050       break;
1051       
1052     case StackOverflow:
1053 #if defined(GRAN)
1054       IF_DEBUG(gran, 
1055                DumpGranEvent(GR_DESCHEDULE, t));
1056       globalGranStats.tot_stackover++;
1057 #elif defined(PAR)
1058       // IF_DEBUG(par, 
1059       // DumpGranEvent(GR_DESCHEDULE, t);
1060       globalParStats.tot_stackover++;
1061 #endif
1062       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1063                                (long)t->id, whatNext_strs[t->what_next]));
1064       /* just adjust the stack for this thread, then pop it back
1065        * on the run queue.
1066        */
1067       threadPaused(t);
1068       { 
1069         /* enlarge the stack */
1070         StgTSO *new_t = threadStackOverflow(t);
1071         
1072         /* This TSO has moved, so update any pointers to it from the
1073          * main thread stack.  It better not be on any other queues...
1074          * (it shouldn't be).
1075          */
1076         if (t->main != NULL) {
1077             t->main->tso = new_t;
1078         }
1079         PUSH_ON_RUN_QUEUE(new_t);
1080       }
1081       break;
1082
1083     case ThreadYielding:
1084       // Reset the context switch flag.  We don't do this just before
1085       // running the thread, because that would mean we would lose ticks
1086       // during GC, which can lead to unfair scheduling (a thread hogs
1087       // the CPU because the tick always arrives during GC).  This way
1088       // penalises threads that do a lot of allocation, but that seems
1089       // better than the alternative.
1090       context_switch = 0;
1091
1092 #if defined(GRAN)
1093       IF_DEBUG(gran, 
1094                DumpGranEvent(GR_DESCHEDULE, t));
1095       globalGranStats.tot_yields++;
1096 #elif defined(PAR)
1097       // IF_DEBUG(par, 
1098       // DumpGranEvent(GR_DESCHEDULE, t);
1099       globalParStats.tot_yields++;
1100 #endif
1101       /* put the thread back on the run queue.  Then, if we're ready to
1102        * GC, check whether this is the last task to stop.  If so, wake
1103        * up the GC thread.  getThread will block during a GC until the
1104        * GC is finished.
1105        */
1106       IF_DEBUG(scheduler,
1107                if (t->what_next != prev_what_next) {
1108                    debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1109                          (long)t->id, whatNext_strs[t->what_next]);
1110                } else {
1111                    debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1112                          (long)t->id, whatNext_strs[t->what_next]);
1113                }
1114                );
1115
1116       IF_DEBUG(sanity,
1117                //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1118                checkTSO(t));
1119       ASSERT(t->link == END_TSO_QUEUE);
1120
1121       // Shortcut if we're just switching evaluators: don't bother
1122       // doing stack squeezing (which can be expensive), just run the
1123       // thread.
1124       if (t->what_next != prev_what_next) {
1125           goto run_thread;
1126       }
1127
1128       threadPaused(t);
1129
1130 #if defined(GRAN)
1131       ASSERT(!is_on_queue(t,CurrentProc));
1132
1133       IF_DEBUG(sanity,
1134                //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1135                checkThreadQsSanity(rtsTrue));
1136 #endif
1137
1138 #if defined(PAR)
1139       if (RtsFlags.ParFlags.doFairScheduling) { 
1140         /* this does round-robin scheduling; good for concurrency */
1141         APPEND_TO_RUN_QUEUE(t);
1142       } else {
1143         /* this does unfair scheduling; good for parallelism */
1144         PUSH_ON_RUN_QUEUE(t);
1145       }
1146 #else
1147       // this does round-robin scheduling; good for concurrency
1148       APPEND_TO_RUN_QUEUE(t);
1149 #endif
1150
1151 #if defined(GRAN)
1152       /* add a ContinueThread event to actually process the thread */
1153       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1154                 ContinueThread,
1155                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1156       IF_GRAN_DEBUG(bq, 
1157                debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1158                G_EVENTQ(0);
1159                G_CURR_THREADQ(0));
1160 #endif /* GRAN */
1161       break;
1162
1163     case ThreadBlocked:
1164 #if defined(GRAN)
1165       IF_DEBUG(scheduler,
1166                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1167                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1168                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1169
1170       // ??? needed; should emit block before
1171       IF_DEBUG(gran, 
1172                DumpGranEvent(GR_DESCHEDULE, t)); 
1173       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1174       /*
1175         ngoq Dogh!
1176       ASSERT(procStatus[CurrentProc]==Busy || 
1177               ((procStatus[CurrentProc]==Fetching) && 
1178               (t->block_info.closure!=(StgClosure*)NULL)));
1179       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1180           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1181             procStatus[CurrentProc]==Fetching)) 
1182         procStatus[CurrentProc] = Idle;
1183       */
1184 #elif defined(PAR)
1185       IF_DEBUG(scheduler,
1186                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1187                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1188       IF_PAR_DEBUG(bq,
1189
1190                    if (t->block_info.closure!=(StgClosure*)NULL) 
1191                      print_bq(t->block_info.closure));
1192
1193       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1194       blockThread(t);
1195
1196       /* whatever we schedule next, we must log that schedule */
1197       emitSchedule = rtsTrue;
1198
1199 #else /* !GRAN */
1200       /* don't need to do anything.  Either the thread is blocked on
1201        * I/O, in which case we'll have called addToBlockedQueue
1202        * previously, or it's blocked on an MVar or Blackhole, in which
1203        * case it'll be on the relevant queue already.
1204        */
1205       ASSERT(t->why_blocked != NotBlocked);
1206       IF_DEBUG(scheduler,
1207                debugBelch("--<< thread %d (%s) stopped: ", 
1208                        t->id, whatNext_strs[t->what_next]);
1209                printThreadBlockage(t);
1210                debugBelch("\n"));
1211
1212       /* Only for dumping event to log file 
1213          ToDo: do I need this in GranSim, too?
1214       blockThread(t);
1215       */
1216 #endif
1217       threadPaused(t);
1218       break;
1219
1220     case ThreadFinished:
1221       /* Need to check whether this was a main thread, and if so, signal
1222        * the task that started it with the return value.  If we have no
1223        * more main threads, we probably need to stop all the tasks until
1224        * we get a new one.
1225        */
1226       /* We also end up here if the thread kills itself with an
1227        * uncaught exception, see Exception.hc.
1228        */
1229       IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1230                                t->id, whatNext_strs[t->what_next]));
1231 #if defined(GRAN)
1232       endThread(t, CurrentProc); // clean-up the thread
1233 #elif defined(PAR)
1234       /* For now all are advisory -- HWL */
1235       //if(t->priority==AdvisoryPriority) ??
1236       advisory_thread_count--;
1237       
1238 # ifdef DIST
1239       if(t->dist.priority==RevalPriority)
1240         FinishReval(t);
1241 # endif
1242       
1243       if (RtsFlags.ParFlags.ParStats.Full &&
1244           !RtsFlags.ParFlags.ParStats.Suppressed) 
1245         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1246 #endif
1247
1248       //
1249       // Check whether the thread that just completed was a main
1250       // thread, and if so return with the result.  
1251       //
1252       // There is an assumption here that all thread completion goes
1253       // through this point; we need to make sure that if a thread
1254       // ends up in the ThreadKilled state, that it stays on the run
1255       // queue so it can be dealt with here.
1256       //
1257       if (
1258 #if defined(RTS_SUPPORTS_THREADS)
1259           mainThread != NULL
1260 #else
1261           mainThread->tso == t
1262 #endif
1263           )
1264       {
1265           // We are a bound thread: this must be our thread that just
1266           // completed.
1267           ASSERT(mainThread->tso == t);
1268
1269           if (t->what_next == ThreadComplete) {
1270               if (mainThread->ret) {
1271                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1272                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1273               }
1274               mainThread->stat = Success;
1275           } else {
1276               if (mainThread->ret) {
1277                   *(mainThread->ret) = NULL;
1278               }
1279               if (was_interrupted) {
1280                   mainThread->stat = Interrupted;
1281               } else {
1282                   mainThread->stat = Killed;
1283               }
1284           }
1285 #ifdef DEBUG
1286           removeThreadLabel((StgWord)mainThread->tso->id);
1287 #endif
1288           if (mainThread->prev == NULL) {
1289               main_threads = mainThread->link;
1290           } else {
1291               mainThread->prev->link = mainThread->link;
1292           }
1293           if (mainThread->link != NULL) {
1294               mainThread->link->prev = NULL;
1295           }
1296           releaseCapability(cap);
1297           return;
1298       }
1299
1300 #ifdef RTS_SUPPORTS_THREADS
1301       ASSERT(t->main == NULL);
1302 #else
1303       if (t->main != NULL) {
1304           // Must be a main thread that is not the topmost one.  Leave
1305           // it on the run queue until the stack has unwound to the
1306           // point where we can deal with this.  Leaving it on the run
1307           // queue also ensures that the garbage collector knows about
1308           // this thread and its return value (it gets dropped from the
1309           // all_threads list so there's no other way to find it).
1310           APPEND_TO_RUN_QUEUE(t);
1311       }
1312 #endif
1313       break;
1314
1315     default:
1316       barf("schedule: invalid thread return code %d", (int)ret);
1317     }
1318
1319 #ifdef PROFILING
1320     // When we have +RTS -i0 and we're heap profiling, do a census at
1321     // every GC.  This lets us get repeatable runs for debugging.
1322     if (performHeapProfile ||
1323         (RtsFlags.ProfFlags.profileInterval==0 &&
1324          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1325         GarbageCollect(GetRoots, rtsTrue);
1326         heapCensus();
1327         performHeapProfile = rtsFalse;
1328         ready_to_gc = rtsFalse; // we already GC'd
1329     }
1330 #endif
1331
1332     if (ready_to_gc) {
1333       /* Kick any transactions which are invalid back to their atomically frames.
1334        * When next scheduled they will try to commit, this commit will fail and
1335        * they will retry. */
1336       for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1337         if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1338           if (!stmValidateTransaction (t -> trec)) {
1339             IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1340
1341             // strip the stack back to the ATOMICALLY_FRAME, aborting
1342             // the (nested) transaction, and saving the stack of any
1343             // partially-evaluated thunks on the heap.
1344             raiseAsync_(t, NULL, rtsTrue);
1345             
1346 #ifdef REG_R1
1347             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1348 #endif
1349           }
1350         }
1351       }
1352
1353       /* everybody back, start the GC.
1354        * Could do it in this thread, or signal a condition var
1355        * to do it in another thread.  Either way, we need to
1356        * broadcast on gc_pending_cond afterward.
1357        */
1358 #if defined(RTS_SUPPORTS_THREADS)
1359       IF_DEBUG(scheduler,sched_belch("doing GC"));
1360 #endif
1361       GarbageCollect(GetRoots,rtsFalse);
1362       ready_to_gc = rtsFalse;
1363 #if defined(GRAN)
1364       /* add a ContinueThread event to continue execution of current thread */
1365       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1366                 ContinueThread,
1367                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1368       IF_GRAN_DEBUG(bq, 
1369                debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1370                G_EVENTQ(0);
1371                G_CURR_THREADQ(0));
1372 #endif /* GRAN */
1373     }
1374
1375 #if defined(GRAN)
1376   next_thread:
1377     IF_GRAN_DEBUG(unused,
1378                   print_eventq(EventHd));
1379
1380     event = get_next_event();
1381 #elif defined(PAR)
1382   next_thread:
1383     /* ToDo: wait for next message to arrive rather than busy wait */
1384 #endif /* GRAN */
1385
1386   } /* end of while(1) */
1387
1388   IF_PAR_DEBUG(verbose,
1389                debugBelch("== Leaving schedule() after having received Finish\n"));
1390 }
1391
1392 /* ---------------------------------------------------------------------------
1393  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1394  * used by Control.Concurrent for error checking.
1395  * ------------------------------------------------------------------------- */
1396  
1397 StgBool
1398 rtsSupportsBoundThreads(void)
1399 {
1400 #ifdef THREADED_RTS
1401   return rtsTrue;
1402 #else
1403   return rtsFalse;
1404 #endif
1405 }
1406
1407 /* ---------------------------------------------------------------------------
1408  * isThreadBound(tso): check whether tso is bound to an OS thread.
1409  * ------------------------------------------------------------------------- */
1410  
1411 StgBool
1412 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1413 {
1414 #ifdef THREADED_RTS
1415   return (tso->main != NULL);
1416 #endif
1417   return rtsFalse;
1418 }
1419
1420 /* ---------------------------------------------------------------------------
1421  * Singleton fork(). Do not copy any running threads.
1422  * ------------------------------------------------------------------------- */
1423
1424 #ifndef mingw32_TARGET_OS
1425 #define FORKPROCESS_PRIMOP_SUPPORTED
1426 #endif
1427
1428 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1429 static void 
1430 deleteThreadImmediately(StgTSO *tso);
1431 #endif
1432 StgInt
1433 forkProcess(HsStablePtr *entry
1434 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1435             STG_UNUSED
1436 #endif
1437            )
1438 {
1439 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1440   pid_t pid;
1441   StgTSO* t,*next;
1442   StgMainThread *m;
1443   SchedulerStatus rc;
1444
1445   IF_DEBUG(scheduler,sched_belch("forking!"));
1446   rts_lock(); // This not only acquires sched_mutex, it also
1447               // makes sure that no other threads are running
1448
1449   pid = fork();
1450
1451   if (pid) { /* parent */
1452
1453   /* just return the pid */
1454     rts_unlock();
1455     return pid;
1456     
1457   } else { /* child */
1458     
1459     
1460       // delete all threads
1461     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1462     
1463     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1464       next = t->link;
1465
1466         // don't allow threads to catch the ThreadKilled exception
1467       deleteThreadImmediately(t);
1468     }
1469     
1470       // wipe the main thread list
1471     while((m = main_threads) != NULL) {
1472       main_threads = m->link;
1473 # ifdef THREADED_RTS
1474       closeCondition(&m->bound_thread_cond);
1475 # endif
1476       stgFree(m);
1477     }
1478     
1479     rc = rts_evalStableIO(entry, NULL);  // run the action
1480     rts_checkSchedStatus("forkProcess",rc);
1481     
1482     rts_unlock();
1483     
1484     hs_exit();                      // clean up and exit
1485     stg_exit(0);
1486   }
1487 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1488   barf("forkProcess#: primop not supported, sorry!\n");
1489   return -1;
1490 #endif
1491 }
1492
1493 /* ---------------------------------------------------------------------------
1494  * deleteAllThreads():  kill all the live threads.
1495  *
1496  * This is used when we catch a user interrupt (^C), before performing
1497  * any necessary cleanups and running finalizers.
1498  *
1499  * Locks: sched_mutex held.
1500  * ------------------------------------------------------------------------- */
1501    
1502 void
1503 deleteAllThreads ( void )
1504 {
1505   StgTSO* t, *next;
1506   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1507   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1508       next = t->global_link;
1509       deleteThread(t);
1510   }      
1511
1512   // The run queue now contains a bunch of ThreadKilled threads.  We
1513   // must not throw these away: the main thread(s) will be in there
1514   // somewhere, and the main scheduler loop has to deal with it.
1515   // Also, the run queue is the only thing keeping these threads from
1516   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1517
1518   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1519   ASSERT(sleeping_queue == END_TSO_QUEUE);
1520 }
1521
1522 /* startThread and  insertThread are now in GranSim.c -- HWL */
1523
1524
1525 /* ---------------------------------------------------------------------------
1526  * Suspending & resuming Haskell threads.
1527  * 
1528  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1529  * its capability before calling the C function.  This allows another
1530  * task to pick up the capability and carry on running Haskell
1531  * threads.  It also means that if the C call blocks, it won't lock
1532  * the whole system.
1533  *
1534  * The Haskell thread making the C call is put to sleep for the
1535  * duration of the call, on the susepended_ccalling_threads queue.  We
1536  * give out a token to the task, which it can use to resume the thread
1537  * on return from the C function.
1538  * ------------------------------------------------------------------------- */
1539    
1540 StgInt
1541 suspendThread( StgRegTable *reg )
1542 {
1543   nat tok;
1544   Capability *cap;
1545   int saved_errno = errno;
1546
1547   /* assume that *reg is a pointer to the StgRegTable part
1548    * of a Capability.
1549    */
1550   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1551
1552   ACQUIRE_LOCK(&sched_mutex);
1553
1554   IF_DEBUG(scheduler,
1555            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1556
1557   // XXX this might not be necessary --SDM
1558   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1559
1560   threadPaused(cap->r.rCurrentTSO);
1561   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1562   suspended_ccalling_threads = cap->r.rCurrentTSO;
1563
1564   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1565       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1566       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1567   } else {
1568       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1569   }
1570
1571   /* Use the thread ID as the token; it should be unique */
1572   tok = cap->r.rCurrentTSO->id;
1573
1574   /* Hand back capability */
1575   releaseCapability(cap);
1576   
1577 #if defined(RTS_SUPPORTS_THREADS)
1578   /* Preparing to leave the RTS, so ensure there's a native thread/task
1579      waiting to take over.
1580   */
1581   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1582 #endif
1583
1584   RELEASE_LOCK(&sched_mutex);
1585   
1586   errno = saved_errno;
1587   return tok; 
1588 }
1589
1590 StgRegTable *
1591 resumeThread( StgInt tok )
1592 {
1593   StgTSO *tso, **prev;
1594   Capability *cap;
1595   int saved_errno = errno;
1596
1597 #if defined(RTS_SUPPORTS_THREADS)
1598   /* Wait for permission to re-enter the RTS with the result. */
1599   ACQUIRE_LOCK(&sched_mutex);
1600   waitForReturnCapability(&sched_mutex, &cap);
1601
1602   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1603 #else
1604   grabCapability(&cap);
1605 #endif
1606
1607   /* Remove the thread off of the suspended list */
1608   prev = &suspended_ccalling_threads;
1609   for (tso = suspended_ccalling_threads; 
1610        tso != END_TSO_QUEUE; 
1611        prev = &tso->link, tso = tso->link) {
1612     if (tso->id == (StgThreadID)tok) {
1613       *prev = tso->link;
1614       break;
1615     }
1616   }
1617   if (tso == END_TSO_QUEUE) {
1618     barf("resumeThread: thread not found");
1619   }
1620   tso->link = END_TSO_QUEUE;
1621   
1622   if(tso->why_blocked == BlockedOnCCall) {
1623       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1624       tso->blocked_exceptions = NULL;
1625   }
1626   
1627   /* Reset blocking status */
1628   tso->why_blocked  = NotBlocked;
1629
1630   cap->r.rCurrentTSO = tso;
1631   RELEASE_LOCK(&sched_mutex);
1632   errno = saved_errno;
1633   return &cap->r;
1634 }
1635
1636
1637 /* ---------------------------------------------------------------------------
1638  * Static functions
1639  * ------------------------------------------------------------------------ */
1640 static void unblockThread(StgTSO *tso);
1641
1642 /* ---------------------------------------------------------------------------
1643  * Comparing Thread ids.
1644  *
1645  * This is used from STG land in the implementation of the
1646  * instances of Eq/Ord for ThreadIds.
1647  * ------------------------------------------------------------------------ */
1648
1649 int
1650 cmp_thread(StgPtr tso1, StgPtr tso2) 
1651
1652   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1653   StgThreadID id2 = ((StgTSO *)tso2)->id;
1654  
1655   if (id1 < id2) return (-1);
1656   if (id1 > id2) return 1;
1657   return 0;
1658 }
1659
1660 /* ---------------------------------------------------------------------------
1661  * Fetching the ThreadID from an StgTSO.
1662  *
1663  * This is used in the implementation of Show for ThreadIds.
1664  * ------------------------------------------------------------------------ */
1665 int
1666 rts_getThreadId(StgPtr tso) 
1667 {
1668   return ((StgTSO *)tso)->id;
1669 }
1670
1671 #ifdef DEBUG
1672 void
1673 labelThread(StgPtr tso, char *label)
1674 {
1675   int len;
1676   void *buf;
1677
1678   /* Caveat: Once set, you can only set the thread name to "" */
1679   len = strlen(label)+1;
1680   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1681   strncpy(buf,label,len);
1682   /* Update will free the old memory for us */
1683   updateThreadLabel(((StgTSO *)tso)->id,buf);
1684 }
1685 #endif /* DEBUG */
1686
1687 /* ---------------------------------------------------------------------------
1688    Create a new thread.
1689
1690    The new thread starts with the given stack size.  Before the
1691    scheduler can run, however, this thread needs to have a closure
1692    (and possibly some arguments) pushed on its stack.  See
1693    pushClosure() in Schedule.h.
1694
1695    createGenThread() and createIOThread() (in SchedAPI.h) are
1696    convenient packaged versions of this function.
1697
1698    currently pri (priority) is only used in a GRAN setup -- HWL
1699    ------------------------------------------------------------------------ */
1700 #if defined(GRAN)
1701 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1702 StgTSO *
1703 createThread(nat size, StgInt pri)
1704 #else
1705 StgTSO *
1706 createThread(nat size)
1707 #endif
1708 {
1709
1710     StgTSO *tso;
1711     nat stack_size;
1712
1713     /* First check whether we should create a thread at all */
1714 #if defined(PAR)
1715   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1716   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1717     threadsIgnored++;
1718     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1719           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1720     return END_TSO_QUEUE;
1721   }
1722   threadsCreated++;
1723 #endif
1724
1725 #if defined(GRAN)
1726   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1727 #endif
1728
1729   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1730
1731   /* catch ridiculously small stack sizes */
1732   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1733     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1734   }
1735
1736   stack_size = size - TSO_STRUCT_SIZEW;
1737
1738   tso = (StgTSO *)allocate(size);
1739   TICK_ALLOC_TSO(stack_size, 0);
1740
1741   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1742 #if defined(GRAN)
1743   SET_GRAN_HDR(tso, ThisPE);
1744 #endif
1745
1746   // Always start with the compiled code evaluator
1747   tso->what_next = ThreadRunGHC;
1748
1749   tso->id = next_thread_id++; 
1750   tso->why_blocked  = NotBlocked;
1751   tso->blocked_exceptions = NULL;
1752
1753   tso->saved_errno = 0;
1754   tso->main = NULL;
1755   
1756   tso->stack_size   = stack_size;
1757   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1758                               - TSO_STRUCT_SIZEW;
1759   tso->sp           = (P_)&(tso->stack) + stack_size;
1760
1761   tso->trec = NO_TREC;
1762
1763 #ifdef PROFILING
1764   tso->prof.CCCS = CCS_MAIN;
1765 #endif
1766
1767   /* put a stop frame on the stack */
1768   tso->sp -= sizeofW(StgStopFrame);
1769   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1770   tso->link = END_TSO_QUEUE;
1771
1772   // ToDo: check this
1773 #if defined(GRAN)
1774   /* uses more flexible routine in GranSim */
1775   insertThread(tso, CurrentProc);
1776 #else
1777   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1778    * from its creation
1779    */
1780 #endif
1781
1782 #if defined(GRAN) 
1783   if (RtsFlags.GranFlags.GranSimStats.Full) 
1784     DumpGranEvent(GR_START,tso);
1785 #elif defined(PAR)
1786   if (RtsFlags.ParFlags.ParStats.Full) 
1787     DumpGranEvent(GR_STARTQ,tso);
1788   /* HACk to avoid SCHEDULE 
1789      LastTSO = tso; */
1790 #endif
1791
1792   /* Link the new thread on the global thread list.
1793    */
1794   tso->global_link = all_threads;
1795   all_threads = tso;
1796
1797 #if defined(DIST)
1798   tso->dist.priority = MandatoryPriority; //by default that is...
1799 #endif
1800
1801 #if defined(GRAN)
1802   tso->gran.pri = pri;
1803 # if defined(DEBUG)
1804   tso->gran.magic = TSO_MAGIC; // debugging only
1805 # endif
1806   tso->gran.sparkname   = 0;
1807   tso->gran.startedat   = CURRENT_TIME; 
1808   tso->gran.exported    = 0;
1809   tso->gran.basicblocks = 0;
1810   tso->gran.allocs      = 0;
1811   tso->gran.exectime    = 0;
1812   tso->gran.fetchtime   = 0;
1813   tso->gran.fetchcount  = 0;
1814   tso->gran.blocktime   = 0;
1815   tso->gran.blockcount  = 0;
1816   tso->gran.blockedat   = 0;
1817   tso->gran.globalsparks = 0;
1818   tso->gran.localsparks  = 0;
1819   if (RtsFlags.GranFlags.Light)
1820     tso->gran.clock  = Now; /* local clock */
1821   else
1822     tso->gran.clock  = 0;
1823
1824   IF_DEBUG(gran,printTSO(tso));
1825 #elif defined(PAR)
1826 # if defined(DEBUG)
1827   tso->par.magic = TSO_MAGIC; // debugging only
1828 # endif
1829   tso->par.sparkname   = 0;
1830   tso->par.startedat   = CURRENT_TIME; 
1831   tso->par.exported    = 0;
1832   tso->par.basicblocks = 0;
1833   tso->par.allocs      = 0;
1834   tso->par.exectime    = 0;
1835   tso->par.fetchtime   = 0;
1836   tso->par.fetchcount  = 0;
1837   tso->par.blocktime   = 0;
1838   tso->par.blockcount  = 0;
1839   tso->par.blockedat   = 0;
1840   tso->par.globalsparks = 0;
1841   tso->par.localsparks  = 0;
1842 #endif
1843
1844 #if defined(GRAN)
1845   globalGranStats.tot_threads_created++;
1846   globalGranStats.threads_created_on_PE[CurrentProc]++;
1847   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1848   globalGranStats.tot_sq_probes++;
1849 #elif defined(PAR)
1850   // collect parallel global statistics (currently done together with GC stats)
1851   if (RtsFlags.ParFlags.ParStats.Global &&
1852       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1853     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1854     globalParStats.tot_threads_created++;
1855   }
1856 #endif 
1857
1858 #if defined(GRAN)
1859   IF_GRAN_DEBUG(pri,
1860                 sched_belch("==__ schedule: Created TSO %d (%p);",
1861                       CurrentProc, tso, tso->id));
1862 #elif defined(PAR)
1863     IF_PAR_DEBUG(verbose,
1864                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1865                        (long)tso->id, tso, advisory_thread_count));
1866 #else
1867   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1868                                 (long)tso->id, (long)tso->stack_size));
1869 #endif    
1870   return tso;
1871 }
1872
1873 #if defined(PAR)
1874 /* RFP:
1875    all parallel thread creation calls should fall through the following routine.
1876 */
1877 StgTSO *
1878 createSparkThread(rtsSpark spark) 
1879 { StgTSO *tso;
1880   ASSERT(spark != (rtsSpark)NULL);
1881   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1882   { threadsIgnored++;
1883     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1884           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1885     return END_TSO_QUEUE;
1886   }
1887   else
1888   { threadsCreated++;
1889     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1890     if (tso==END_TSO_QUEUE)     
1891       barf("createSparkThread: Cannot create TSO");
1892 #if defined(DIST)
1893     tso->priority = AdvisoryPriority;
1894 #endif
1895     pushClosure(tso,spark);
1896     PUSH_ON_RUN_QUEUE(tso);
1897     advisory_thread_count++;    
1898   }
1899   return tso;
1900 }
1901 #endif
1902
1903 /*
1904   Turn a spark into a thread.
1905   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1906 */
1907 #if defined(PAR)
1908 StgTSO *
1909 activateSpark (rtsSpark spark) 
1910 {
1911   StgTSO *tso;
1912
1913   tso = createSparkThread(spark);
1914   if (RtsFlags.ParFlags.ParStats.Full) {   
1915     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1916     IF_PAR_DEBUG(verbose,
1917                  debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1918                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1919   }
1920   // ToDo: fwd info on local/global spark to thread -- HWL
1921   // tso->gran.exported =  spark->exported;
1922   // tso->gran.locked =   !spark->global;
1923   // tso->gran.sparkname = spark->name;
1924
1925   return tso;
1926 }
1927 #endif
1928
1929 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1930                                    Capability *initialCapability
1931                                    );
1932
1933
1934 /* ---------------------------------------------------------------------------
1935  * scheduleThread()
1936  *
1937  * scheduleThread puts a thread on the head of the runnable queue.
1938  * This will usually be done immediately after a thread is created.
1939  * The caller of scheduleThread must create the thread using e.g.
1940  * createThread and push an appropriate closure
1941  * on this thread's stack before the scheduler is invoked.
1942  * ------------------------------------------------------------------------ */
1943
1944 static void scheduleThread_ (StgTSO* tso);
1945
1946 void
1947 scheduleThread_(StgTSO *tso)
1948 {
1949   // The thread goes at the *end* of the run-queue, to avoid possible
1950   // starvation of any threads already on the queue.
1951   APPEND_TO_RUN_QUEUE(tso);
1952   threadRunnable();
1953 }
1954
1955 void
1956 scheduleThread(StgTSO* tso)
1957 {
1958   ACQUIRE_LOCK(&sched_mutex);
1959   scheduleThread_(tso);
1960   RELEASE_LOCK(&sched_mutex);
1961 }
1962
1963 #if defined(RTS_SUPPORTS_THREADS)
1964 static Condition bound_cond_cache;
1965 static int bound_cond_cache_full = 0;
1966 #endif
1967
1968
1969 SchedulerStatus
1970 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1971                    Capability *initialCapability)
1972 {
1973     // Precondition: sched_mutex must be held
1974     StgMainThread *m;
1975
1976     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1977     m->tso = tso;
1978     tso->main = m;
1979     m->ret = ret;
1980     m->stat = NoStatus;
1981     m->link = main_threads;
1982     m->prev = NULL;
1983     if (main_threads != NULL) {
1984         main_threads->prev = m;
1985     }
1986     main_threads = m;
1987
1988 #if defined(RTS_SUPPORTS_THREADS)
1989     // Allocating a new condition for each thread is expensive, so we
1990     // cache one.  This is a pretty feeble hack, but it helps speed up
1991     // consecutive call-ins quite a bit.
1992     if (bound_cond_cache_full) {
1993         m->bound_thread_cond = bound_cond_cache;
1994         bound_cond_cache_full = 0;
1995     } else {
1996         initCondition(&m->bound_thread_cond);
1997     }
1998 #endif
1999
2000     /* Put the thread on the main-threads list prior to scheduling the TSO.
2001        Failure to do so introduces a race condition in the MT case (as
2002        identified by Wolfgang Thaller), whereby the new task/OS thread 
2003        created by scheduleThread_() would complete prior to the thread
2004        that spawned it managed to put 'itself' on the main-threads list.
2005        The upshot of it all being that the worker thread wouldn't get to
2006        signal the completion of the its work item for the main thread to
2007        see (==> it got stuck waiting.)    -- sof 6/02.
2008     */
2009     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2010     
2011     APPEND_TO_RUN_QUEUE(tso);
2012     // NB. Don't call threadRunnable() here, because the thread is
2013     // bound and only runnable by *this* OS thread, so waking up other
2014     // workers will just slow things down.
2015
2016     return waitThread_(m, initialCapability);
2017 }
2018
2019 /* ---------------------------------------------------------------------------
2020  * initScheduler()
2021  *
2022  * Initialise the scheduler.  This resets all the queues - if the
2023  * queues contained any threads, they'll be garbage collected at the
2024  * next pass.
2025  *
2026  * ------------------------------------------------------------------------ */
2027
2028 void 
2029 initScheduler(void)
2030 {
2031 #if defined(GRAN)
2032   nat i;
2033
2034   for (i=0; i<=MAX_PROC; i++) {
2035     run_queue_hds[i]      = END_TSO_QUEUE;
2036     run_queue_tls[i]      = END_TSO_QUEUE;
2037     blocked_queue_hds[i]  = END_TSO_QUEUE;
2038     blocked_queue_tls[i]  = END_TSO_QUEUE;
2039     ccalling_threadss[i]  = END_TSO_QUEUE;
2040     sleeping_queue        = END_TSO_QUEUE;
2041   }
2042 #else
2043   run_queue_hd      = END_TSO_QUEUE;
2044   run_queue_tl      = END_TSO_QUEUE;
2045   blocked_queue_hd  = END_TSO_QUEUE;
2046   blocked_queue_tl  = END_TSO_QUEUE;
2047   sleeping_queue    = END_TSO_QUEUE;
2048 #endif 
2049
2050   suspended_ccalling_threads  = END_TSO_QUEUE;
2051
2052   main_threads = NULL;
2053   all_threads  = END_TSO_QUEUE;
2054
2055   context_switch = 0;
2056   interrupted    = 0;
2057
2058   RtsFlags.ConcFlags.ctxtSwitchTicks =
2059       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2060       
2061 #if defined(RTS_SUPPORTS_THREADS)
2062   /* Initialise the mutex and condition variables used by
2063    * the scheduler. */
2064   initMutex(&sched_mutex);
2065   initMutex(&term_mutex);
2066 #endif
2067   
2068   ACQUIRE_LOCK(&sched_mutex);
2069
2070   /* A capability holds the state a native thread needs in
2071    * order to execute STG code. At least one capability is
2072    * floating around (only SMP builds have more than one).
2073    */
2074   initCapabilities();
2075   
2076 #if defined(RTS_SUPPORTS_THREADS)
2077     /* start our haskell execution tasks */
2078     startTaskManager(0,taskStart);
2079 #endif
2080
2081 #if /* defined(SMP) ||*/ defined(PAR)
2082   initSparkPools();
2083 #endif
2084
2085   RELEASE_LOCK(&sched_mutex);
2086 }
2087
2088 void
2089 exitScheduler( void )
2090 {
2091 #if defined(RTS_SUPPORTS_THREADS)
2092   stopTaskManager();
2093 #endif
2094   shutting_down_scheduler = rtsTrue;
2095 }
2096
2097 /* ----------------------------------------------------------------------------
2098    Managing the per-task allocation areas.
2099    
2100    Each capability comes with an allocation area.  These are
2101    fixed-length block lists into which allocation can be done.
2102
2103    ToDo: no support for two-space collection at the moment???
2104    ------------------------------------------------------------------------- */
2105
2106 static
2107 SchedulerStatus
2108 waitThread_(StgMainThread* m, Capability *initialCapability)
2109 {
2110   SchedulerStatus stat;
2111
2112   // Precondition: sched_mutex must be held.
2113   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2114
2115 #if defined(GRAN)
2116   /* GranSim specific init */
2117   CurrentTSO = m->tso;                // the TSO to run
2118   procStatus[MainProc] = Busy;        // status of main PE
2119   CurrentProc = MainProc;             // PE to run it on
2120   schedule(m,initialCapability);
2121 #else
2122   schedule(m,initialCapability);
2123   ASSERT(m->stat != NoStatus);
2124 #endif
2125
2126   stat = m->stat;
2127
2128 #if defined(RTS_SUPPORTS_THREADS)
2129   // Free the condition variable, returning it to the cache if possible.
2130   if (!bound_cond_cache_full) {
2131       bound_cond_cache = m->bound_thread_cond;
2132       bound_cond_cache_full = 1;
2133   } else {
2134       closeCondition(&m->bound_thread_cond);
2135   }
2136 #endif
2137
2138   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2139   stgFree(m);
2140
2141   // Postcondition: sched_mutex still held
2142   return stat;
2143 }
2144
2145 /* ---------------------------------------------------------------------------
2146    Where are the roots that we know about?
2147
2148         - all the threads on the runnable queue
2149         - all the threads on the blocked queue
2150         - all the threads on the sleeping queue
2151         - all the thread currently executing a _ccall_GC
2152         - all the "main threads"
2153      
2154    ------------------------------------------------------------------------ */
2155
2156 /* This has to be protected either by the scheduler monitor, or by the
2157         garbage collection monitor (probably the latter).
2158         KH @ 25/10/99
2159 */
2160
2161 void
2162 GetRoots( evac_fn evac )
2163 {
2164 #if defined(GRAN)
2165   {
2166     nat i;
2167     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2168       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2169           evac((StgClosure **)&run_queue_hds[i]);
2170       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2171           evac((StgClosure **)&run_queue_tls[i]);
2172       
2173       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2174           evac((StgClosure **)&blocked_queue_hds[i]);
2175       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2176           evac((StgClosure **)&blocked_queue_tls[i]);
2177       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2178           evac((StgClosure **)&ccalling_threads[i]);
2179     }
2180   }
2181
2182   markEventQueue();
2183
2184 #else /* !GRAN */
2185   if (run_queue_hd != END_TSO_QUEUE) {
2186       ASSERT(run_queue_tl != END_TSO_QUEUE);
2187       evac((StgClosure **)&run_queue_hd);
2188       evac((StgClosure **)&run_queue_tl);
2189   }
2190   
2191   if (blocked_queue_hd != END_TSO_QUEUE) {
2192       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2193       evac((StgClosure **)&blocked_queue_hd);
2194       evac((StgClosure **)&blocked_queue_tl);
2195   }
2196   
2197   if (sleeping_queue != END_TSO_QUEUE) {
2198       evac((StgClosure **)&sleeping_queue);
2199   }
2200 #endif 
2201
2202   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2203       evac((StgClosure **)&suspended_ccalling_threads);
2204   }
2205
2206 #if defined(PAR) || defined(GRAN)
2207   markSparkQueue(evac);
2208 #endif
2209
2210 #if defined(RTS_USER_SIGNALS)
2211   // mark the signal handlers (signals should be already blocked)
2212   markSignalHandlers(evac);
2213 #endif
2214 }
2215
2216 /* -----------------------------------------------------------------------------
2217    performGC
2218
2219    This is the interface to the garbage collector from Haskell land.
2220    We provide this so that external C code can allocate and garbage
2221    collect when called from Haskell via _ccall_GC.
2222
2223    It might be useful to provide an interface whereby the programmer
2224    can specify more roots (ToDo).
2225    
2226    This needs to be protected by the GC condition variable above.  KH.
2227    -------------------------------------------------------------------------- */
2228
2229 static void (*extra_roots)(evac_fn);
2230
2231 void
2232 performGC(void)
2233 {
2234   /* Obligated to hold this lock upon entry */
2235   ACQUIRE_LOCK(&sched_mutex);
2236   GarbageCollect(GetRoots,rtsFalse);
2237   RELEASE_LOCK(&sched_mutex);
2238 }
2239
2240 void
2241 performMajorGC(void)
2242 {
2243   ACQUIRE_LOCK(&sched_mutex);
2244   GarbageCollect(GetRoots,rtsTrue);
2245   RELEASE_LOCK(&sched_mutex);
2246 }
2247
2248 static void
2249 AllRoots(evac_fn evac)
2250 {
2251     GetRoots(evac);             // the scheduler's roots
2252     extra_roots(evac);          // the user's roots
2253 }
2254
2255 void
2256 performGCWithRoots(void (*get_roots)(evac_fn))
2257 {
2258   ACQUIRE_LOCK(&sched_mutex);
2259   extra_roots = get_roots;
2260   GarbageCollect(AllRoots,rtsFalse);
2261   RELEASE_LOCK(&sched_mutex);
2262 }
2263
2264 /* -----------------------------------------------------------------------------
2265    Stack overflow
2266
2267    If the thread has reached its maximum stack size, then raise the
2268    StackOverflow exception in the offending thread.  Otherwise
2269    relocate the TSO into a larger chunk of memory and adjust its stack
2270    size appropriately.
2271    -------------------------------------------------------------------------- */
2272
2273 static StgTSO *
2274 threadStackOverflow(StgTSO *tso)
2275 {
2276   nat new_stack_size, new_tso_size, stack_words;
2277   StgPtr new_sp;
2278   StgTSO *dest;
2279
2280   IF_DEBUG(sanity,checkTSO(tso));
2281   if (tso->stack_size >= tso->max_stack_size) {
2282
2283     IF_DEBUG(gc,
2284              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2285                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2286              /* If we're debugging, just print out the top of the stack */
2287              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2288                                               tso->sp+64)));
2289
2290     /* Send this thread the StackOverflow exception */
2291     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2292     return tso;
2293   }
2294
2295   /* Try to double the current stack size.  If that takes us over the
2296    * maximum stack size for this thread, then use the maximum instead.
2297    * Finally round up so the TSO ends up as a whole number of blocks.
2298    */
2299   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2300   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2301                                        TSO_STRUCT_SIZE)/sizeof(W_);
2302   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2303   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2304
2305   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2306
2307   dest = (StgTSO *)allocate(new_tso_size);
2308   TICK_ALLOC_TSO(new_stack_size,0);
2309
2310   /* copy the TSO block and the old stack into the new area */
2311   memcpy(dest,tso,TSO_STRUCT_SIZE);
2312   stack_words = tso->stack + tso->stack_size - tso->sp;
2313   new_sp = (P_)dest + new_tso_size - stack_words;
2314   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2315
2316   /* relocate the stack pointers... */
2317   dest->sp         = new_sp;
2318   dest->stack_size = new_stack_size;
2319         
2320   /* Mark the old TSO as relocated.  We have to check for relocated
2321    * TSOs in the garbage collector and any primops that deal with TSOs.
2322    *
2323    * It's important to set the sp value to just beyond the end
2324    * of the stack, so we don't attempt to scavenge any part of the
2325    * dead TSO's stack.
2326    */
2327   tso->what_next = ThreadRelocated;
2328   tso->link = dest;
2329   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2330   tso->why_blocked = NotBlocked;
2331   dest->mut_link = NULL;
2332
2333   IF_PAR_DEBUG(verbose,
2334                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2335                      tso->id, tso, tso->stack_size);
2336                /* If we're debugging, just print out the top of the stack */
2337                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2338                                                 tso->sp+64)));
2339   
2340   IF_DEBUG(sanity,checkTSO(tso));
2341 #if 0
2342   IF_DEBUG(scheduler,printTSO(dest));
2343 #endif
2344
2345   return dest;
2346 }
2347
2348 /* ---------------------------------------------------------------------------
2349    Wake up a queue that was blocked on some resource.
2350    ------------------------------------------------------------------------ */
2351
2352 #if defined(GRAN)
2353 STATIC_INLINE void
2354 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2355 {
2356 }
2357 #elif defined(PAR)
2358 STATIC_INLINE void
2359 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2360 {
2361   /* write RESUME events to log file and
2362      update blocked and fetch time (depending on type of the orig closure) */
2363   if (RtsFlags.ParFlags.ParStats.Full) {
2364     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2365                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2366                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2367     if (EMPTY_RUN_QUEUE())
2368       emitSchedule = rtsTrue;
2369
2370     switch (get_itbl(node)->type) {
2371         case FETCH_ME_BQ:
2372           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2373           break;
2374         case RBH:
2375         case FETCH_ME:
2376         case BLACKHOLE_BQ:
2377           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2378           break;
2379 #ifdef DIST
2380         case MVAR:
2381           break;
2382 #endif    
2383         default:
2384           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2385         }
2386       }
2387 }
2388 #endif
2389
2390 #if defined(GRAN)
2391 static StgBlockingQueueElement *
2392 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2393 {
2394     StgTSO *tso;
2395     PEs node_loc, tso_loc;
2396
2397     node_loc = where_is(node); // should be lifted out of loop
2398     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2399     tso_loc = where_is((StgClosure *)tso);
2400     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2401       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2402       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2403       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2404       // insertThread(tso, node_loc);
2405       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2406                 ResumeThread,
2407                 tso, node, (rtsSpark*)NULL);
2408       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2409       // len_local++;
2410       // len++;
2411     } else { // TSO is remote (actually should be FMBQ)
2412       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2413                                   RtsFlags.GranFlags.Costs.gunblocktime +
2414                                   RtsFlags.GranFlags.Costs.latency;
2415       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2416                 UnblockThread,
2417                 tso, node, (rtsSpark*)NULL);
2418       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2419       // len++;
2420     }
2421     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2422     IF_GRAN_DEBUG(bq,
2423                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2424                           (node_loc==tso_loc ? "Local" : "Global"), 
2425                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2426     tso->block_info.closure = NULL;
2427     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2428                              tso->id, tso));
2429 }
2430 #elif defined(PAR)
2431 static StgBlockingQueueElement *
2432 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2433 {
2434     StgBlockingQueueElement *next;
2435
2436     switch (get_itbl(bqe)->type) {
2437     case TSO:
2438       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2439       /* if it's a TSO just push it onto the run_queue */
2440       next = bqe->link;
2441       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2442       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2443       threadRunnable();
2444       unblockCount(bqe, node);
2445       /* reset blocking status after dumping event */
2446       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2447       break;
2448
2449     case BLOCKED_FETCH:
2450       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2451       next = bqe->link;
2452       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2453       PendingFetches = (StgBlockedFetch *)bqe;
2454       break;
2455
2456 # if defined(DEBUG)
2457       /* can ignore this case in a non-debugging setup; 
2458          see comments on RBHSave closures above */
2459     case CONSTR:
2460       /* check that the closure is an RBHSave closure */
2461       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2462              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2463              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2464       break;
2465
2466     default:
2467       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2468            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2469            (StgClosure *)bqe);
2470 # endif
2471     }
2472   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2473   return next;
2474 }
2475
2476 #else /* !GRAN && !PAR */
2477 static StgTSO *
2478 unblockOneLocked(StgTSO *tso)
2479 {
2480   StgTSO *next;
2481
2482   ASSERT(get_itbl(tso)->type == TSO);
2483   ASSERT(tso->why_blocked != NotBlocked);
2484   tso->why_blocked = NotBlocked;
2485   next = tso->link;
2486   tso->link = END_TSO_QUEUE;
2487   APPEND_TO_RUN_QUEUE(tso);
2488   threadRunnable();
2489   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2490   return next;
2491 }
2492 #endif
2493
2494 #if defined(GRAN) || defined(PAR)
2495 INLINE_ME StgBlockingQueueElement *
2496 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2497 {
2498   ACQUIRE_LOCK(&sched_mutex);
2499   bqe = unblockOneLocked(bqe, node);
2500   RELEASE_LOCK(&sched_mutex);
2501   return bqe;
2502 }
2503 #else
2504 INLINE_ME StgTSO *
2505 unblockOne(StgTSO *tso)
2506 {
2507   ACQUIRE_LOCK(&sched_mutex);
2508   tso = unblockOneLocked(tso);
2509   RELEASE_LOCK(&sched_mutex);
2510   return tso;
2511 }
2512 #endif
2513
2514 #if defined(GRAN)
2515 void 
2516 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2517 {
2518   StgBlockingQueueElement *bqe;
2519   PEs node_loc;
2520   nat len = 0; 
2521
2522   IF_GRAN_DEBUG(bq, 
2523                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2524                       node, CurrentProc, CurrentTime[CurrentProc], 
2525                       CurrentTSO->id, CurrentTSO));
2526
2527   node_loc = where_is(node);
2528
2529   ASSERT(q == END_BQ_QUEUE ||
2530          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2531          get_itbl(q)->type == CONSTR); // closure (type constructor)
2532   ASSERT(is_unique(node));
2533
2534   /* FAKE FETCH: magically copy the node to the tso's proc;
2535      no Fetch necessary because in reality the node should not have been 
2536      moved to the other PE in the first place
2537   */
2538   if (CurrentProc!=node_loc) {
2539     IF_GRAN_DEBUG(bq, 
2540                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2541                         node, node_loc, CurrentProc, CurrentTSO->id, 
2542                         // CurrentTSO, where_is(CurrentTSO),
2543                         node->header.gran.procs));
2544     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2545     IF_GRAN_DEBUG(bq, 
2546                   debugBelch("## new bitmask of node %p is %#x\n",
2547                         node, node->header.gran.procs));
2548     if (RtsFlags.GranFlags.GranSimStats.Global) {
2549       globalGranStats.tot_fake_fetches++;
2550     }
2551   }
2552
2553   bqe = q;
2554   // ToDo: check: ASSERT(CurrentProc==node_loc);
2555   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2556     //next = bqe->link;
2557     /* 
2558        bqe points to the current element in the queue
2559        next points to the next element in the queue
2560     */
2561     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2562     //tso_loc = where_is(tso);
2563     len++;
2564     bqe = unblockOneLocked(bqe, node);
2565   }
2566
2567   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2568      the closure to make room for the anchor of the BQ */
2569   if (bqe!=END_BQ_QUEUE) {
2570     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2571     /*
2572     ASSERT((info_ptr==&RBH_Save_0_info) ||
2573            (info_ptr==&RBH_Save_1_info) ||
2574            (info_ptr==&RBH_Save_2_info));
2575     */
2576     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2577     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2578     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2579
2580     IF_GRAN_DEBUG(bq,
2581                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2582                         node, info_type(node)));
2583   }
2584
2585   /* statistics gathering */
2586   if (RtsFlags.GranFlags.GranSimStats.Global) {
2587     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2588     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2589     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2590     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2591   }
2592   IF_GRAN_DEBUG(bq,
2593                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2594                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2595 }
2596 #elif defined(PAR)
2597 void 
2598 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2599 {
2600   StgBlockingQueueElement *bqe;
2601
2602   ACQUIRE_LOCK(&sched_mutex);
2603
2604   IF_PAR_DEBUG(verbose, 
2605                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2606                      node, mytid));
2607 #ifdef DIST  
2608   //RFP
2609   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2610     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2611     return;
2612   }
2613 #endif
2614   
2615   ASSERT(q == END_BQ_QUEUE ||
2616          get_itbl(q)->type == TSO ||           
2617          get_itbl(q)->type == BLOCKED_FETCH || 
2618          get_itbl(q)->type == CONSTR); 
2619
2620   bqe = q;
2621   while (get_itbl(bqe)->type==TSO || 
2622          get_itbl(bqe)->type==BLOCKED_FETCH) {
2623     bqe = unblockOneLocked(bqe, node);
2624   }
2625   RELEASE_LOCK(&sched_mutex);
2626 }
2627
2628 #else   /* !GRAN && !PAR */
2629
2630 void
2631 awakenBlockedQueueNoLock(StgTSO *tso)
2632 {
2633   while (tso != END_TSO_QUEUE) {
2634     tso = unblockOneLocked(tso);
2635   }
2636 }
2637
2638 void
2639 awakenBlockedQueue(StgTSO *tso)
2640 {
2641   ACQUIRE_LOCK(&sched_mutex);
2642   while (tso != END_TSO_QUEUE) {
2643     tso = unblockOneLocked(tso);
2644   }
2645   RELEASE_LOCK(&sched_mutex);
2646 }
2647 #endif
2648
2649 /* ---------------------------------------------------------------------------
2650    Interrupt execution
2651    - usually called inside a signal handler so it mustn't do anything fancy.   
2652    ------------------------------------------------------------------------ */
2653
2654 void
2655 interruptStgRts(void)
2656 {
2657     interrupted    = 1;
2658     context_switch = 1;
2659 }
2660
2661 /* -----------------------------------------------------------------------------
2662    Unblock a thread
2663
2664    This is for use when we raise an exception in another thread, which
2665    may be blocked.
2666    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2667    -------------------------------------------------------------------------- */
2668
2669 #if defined(GRAN) || defined(PAR)
2670 /*
2671   NB: only the type of the blocking queue is different in GranSim and GUM
2672       the operations on the queue-elements are the same
2673       long live polymorphism!
2674
2675   Locks: sched_mutex is held upon entry and exit.
2676
2677 */
2678 static void
2679 unblockThread(StgTSO *tso)
2680 {
2681   StgBlockingQueueElement *t, **last;
2682
2683   switch (tso->why_blocked) {
2684
2685   case NotBlocked:
2686     return;  /* not blocked */
2687
2688   case BlockedOnSTM:
2689     // Be careful: nothing to do here!  We tell the scheduler that the thread
2690     // is runnable and we leave it to the stack-walking code to abort the 
2691     // transaction while unwinding the stack.  We should perhaps have a debugging
2692     // test to make sure that this really happens and that the 'zombie' transaction
2693     // does not get committed.
2694     goto done;
2695
2696   case BlockedOnMVar:
2697     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2698     {
2699       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2700       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2701
2702       last = (StgBlockingQueueElement **)&mvar->head;
2703       for (t = (StgBlockingQueueElement *)mvar->head; 
2704            t != END_BQ_QUEUE; 
2705            last = &t->link, last_tso = t, t = t->link) {
2706         if (t == (StgBlockingQueueElement *)tso) {
2707           *last = (StgBlockingQueueElement *)tso->link;
2708           if (mvar->tail == tso) {
2709             mvar->tail = (StgTSO *)last_tso;
2710           }
2711           goto done;
2712         }
2713       }
2714       barf("unblockThread (MVAR): TSO not found");
2715     }
2716
2717   case BlockedOnBlackHole:
2718     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2719     {
2720       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2721
2722       last = &bq->blocking_queue;
2723       for (t = bq->blocking_queue; 
2724            t != END_BQ_QUEUE; 
2725            last = &t->link, t = t->link) {
2726         if (t == (StgBlockingQueueElement *)tso) {
2727           *last = (StgBlockingQueueElement *)tso->link;
2728           goto done;
2729         }
2730       }
2731       barf("unblockThread (BLACKHOLE): TSO not found");
2732     }
2733
2734   case BlockedOnException:
2735     {
2736       StgTSO *target  = tso->block_info.tso;
2737
2738       ASSERT(get_itbl(target)->type == TSO);
2739
2740       if (target->what_next == ThreadRelocated) {
2741           target = target->link;
2742           ASSERT(get_itbl(target)->type == TSO);
2743       }
2744
2745       ASSERT(target->blocked_exceptions != NULL);
2746
2747       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2748       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2749            t != END_BQ_QUEUE; 
2750            last = &t->link, t = t->link) {
2751         ASSERT(get_itbl(t)->type == TSO);
2752         if (t == (StgBlockingQueueElement *)tso) {
2753           *last = (StgBlockingQueueElement *)tso->link;
2754           goto done;
2755         }
2756       }
2757       barf("unblockThread (Exception): TSO not found");
2758     }
2759
2760   case BlockedOnRead:
2761   case BlockedOnWrite:
2762 #if defined(mingw32_TARGET_OS)
2763   case BlockedOnDoProc:
2764 #endif
2765     {
2766       /* take TSO off blocked_queue */
2767       StgBlockingQueueElement *prev = NULL;
2768       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2769            prev = t, t = t->link) {
2770         if (t == (StgBlockingQueueElement *)tso) {
2771           if (prev == NULL) {
2772             blocked_queue_hd = (StgTSO *)t->link;
2773             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2774               blocked_queue_tl = END_TSO_QUEUE;
2775             }
2776           } else {
2777             prev->link = t->link;
2778             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2779               blocked_queue_tl = (StgTSO *)prev;
2780             }
2781           }
2782           goto done;
2783         }
2784       }
2785       barf("unblockThread (I/O): TSO not found");
2786     }
2787
2788   case BlockedOnDelay:
2789     {
2790       /* take TSO off sleeping_queue */
2791       StgBlockingQueueElement *prev = NULL;
2792       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2793            prev = t, t = t->link) {
2794         if (t == (StgBlockingQueueElement *)tso) {
2795           if (prev == NULL) {
2796             sleeping_queue = (StgTSO *)t->link;
2797           } else {
2798             prev->link = t->link;
2799           }
2800           goto done;
2801         }
2802       }
2803       barf("unblockThread (delay): TSO not found");
2804     }
2805
2806   default:
2807     barf("unblockThread");
2808   }
2809
2810  done:
2811   tso->link = END_TSO_QUEUE;
2812   tso->why_blocked = NotBlocked;
2813   tso->block_info.closure = NULL;
2814   PUSH_ON_RUN_QUEUE(tso);
2815 }
2816 #else
2817 static void
2818 unblockThread(StgTSO *tso)
2819 {
2820   StgTSO *t, **last;
2821   
2822   /* To avoid locking unnecessarily. */
2823   if (tso->why_blocked == NotBlocked) {
2824     return;
2825   }
2826
2827   switch (tso->why_blocked) {
2828
2829   case BlockedOnSTM:
2830     // Be careful: nothing to do here!  We tell the scheduler that the thread
2831     // is runnable and we leave it to the stack-walking code to abort the 
2832     // transaction while unwinding the stack.  We should perhaps have a debugging
2833     // test to make sure that this really happens and that the 'zombie' transaction
2834     // does not get committed.
2835     goto done;
2836
2837   case BlockedOnMVar:
2838     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2839     {
2840       StgTSO *last_tso = END_TSO_QUEUE;
2841       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2842
2843       last = &mvar->head;
2844       for (t = mvar->head; t != END_TSO_QUEUE; 
2845            last = &t->link, last_tso = t, t = t->link) {
2846         if (t == tso) {
2847           *last = tso->link;
2848           if (mvar->tail == tso) {
2849             mvar->tail = last_tso;
2850           }
2851           goto done;
2852         }
2853       }
2854       barf("unblockThread (MVAR): TSO not found");
2855     }
2856
2857   case BlockedOnBlackHole:
2858     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2859     {
2860       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2861
2862       last = &bq->blocking_queue;
2863       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2864            last = &t->link, t = t->link) {
2865         if (t == tso) {
2866           *last = tso->link;
2867           goto done;
2868         }
2869       }
2870       barf("unblockThread (BLACKHOLE): TSO not found");
2871     }
2872
2873   case BlockedOnException:
2874     {
2875       StgTSO *target  = tso->block_info.tso;
2876
2877       ASSERT(get_itbl(target)->type == TSO);
2878
2879       while (target->what_next == ThreadRelocated) {
2880           target = target->link;
2881           ASSERT(get_itbl(target)->type == TSO);
2882       }
2883       
2884       ASSERT(target->blocked_exceptions != NULL);
2885
2886       last = &target->blocked_exceptions;
2887       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2888            last = &t->link, t = t->link) {
2889         ASSERT(get_itbl(t)->type == TSO);
2890         if (t == tso) {
2891           *last = tso->link;
2892           goto done;
2893         }
2894       }
2895       barf("unblockThread (Exception): TSO not found");
2896     }
2897
2898   case BlockedOnRead:
2899   case BlockedOnWrite:
2900 #if defined(mingw32_TARGET_OS)
2901   case BlockedOnDoProc:
2902 #endif
2903     {
2904       StgTSO *prev = NULL;
2905       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2906            prev = t, t = t->link) {
2907         if (t == tso) {
2908           if (prev == NULL) {
2909             blocked_queue_hd = t->link;
2910             if (blocked_queue_tl == t) {
2911               blocked_queue_tl = END_TSO_QUEUE;
2912             }
2913           } else {
2914             prev->link = t->link;
2915             if (blocked_queue_tl == t) {
2916               blocked_queue_tl = prev;
2917             }
2918           }
2919           goto done;
2920         }
2921       }
2922       barf("unblockThread (I/O): TSO not found");
2923     }
2924
2925   case BlockedOnDelay:
2926     {
2927       StgTSO *prev = NULL;
2928       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2929            prev = t, t = t->link) {
2930         if (t == tso) {
2931           if (prev == NULL) {
2932             sleeping_queue = t->link;
2933           } else {
2934             prev->link = t->link;
2935           }
2936           goto done;
2937         }
2938       }
2939       barf("unblockThread (delay): TSO not found");
2940     }
2941
2942   default:
2943     barf("unblockThread");
2944   }
2945
2946  done:
2947   tso->link = END_TSO_QUEUE;
2948   tso->why_blocked = NotBlocked;
2949   tso->block_info.closure = NULL;
2950   APPEND_TO_RUN_QUEUE(tso);
2951 }
2952 #endif
2953
2954 /* -----------------------------------------------------------------------------
2955  * raiseAsync()
2956  *
2957  * The following function implements the magic for raising an
2958  * asynchronous exception in an existing thread.
2959  *
2960  * We first remove the thread from any queue on which it might be
2961  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2962  *
2963  * We strip the stack down to the innermost CATCH_FRAME, building
2964  * thunks in the heap for all the active computations, so they can 
2965  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2966  * an application of the handler to the exception, and push it on
2967  * the top of the stack.
2968  * 
2969  * How exactly do we save all the active computations?  We create an
2970  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2971  * AP_STACKs pushes everything from the corresponding update frame
2972  * upwards onto the stack.  (Actually, it pushes everything up to the
2973  * next update frame plus a pointer to the next AP_STACK object.
2974  * Entering the next AP_STACK object pushes more onto the stack until we
2975  * reach the last AP_STACK object - at which point the stack should look
2976  * exactly as it did when we killed the TSO and we can continue
2977  * execution by entering the closure on top of the stack.
2978  *
2979  * We can also kill a thread entirely - this happens if either (a) the 
2980  * exception passed to raiseAsync is NULL, or (b) there's no
2981  * CATCH_FRAME on the stack.  In either case, we strip the entire
2982  * stack and replace the thread with a zombie.
2983  *
2984  * Locks: sched_mutex held upon entry nor exit.
2985  *
2986  * -------------------------------------------------------------------------- */
2987  
2988 void 
2989 deleteThread(StgTSO *tso)
2990 {
2991   if (tso->why_blocked != BlockedOnCCall &&
2992       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2993       raiseAsync(tso,NULL);
2994   }
2995 }
2996
2997 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2998 static void 
2999 deleteThreadImmediately(StgTSO *tso)
3000 { // for forkProcess only:
3001   // delete thread without giving it a chance to catch the KillThread exception
3002
3003   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3004       return;
3005   }
3006
3007   if (tso->why_blocked != BlockedOnCCall &&
3008       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3009     unblockThread(tso);
3010   }
3011
3012   tso->what_next = ThreadKilled;
3013 }
3014 #endif
3015
3016 void
3017 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3018 {
3019   /* When raising async exs from contexts where sched_mutex isn't held;
3020      use raiseAsyncWithLock(). */
3021   ACQUIRE_LOCK(&sched_mutex);
3022   raiseAsync(tso,exception);
3023   RELEASE_LOCK(&sched_mutex);
3024 }
3025
3026 void
3027 raiseAsync(StgTSO *tso, StgClosure *exception)
3028 {
3029     raiseAsync_(tso, exception, rtsFalse);
3030 }
3031
3032 static void
3033 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3034 {
3035     StgRetInfoTable *info;
3036     StgPtr sp;
3037   
3038     // Thread already dead?
3039     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3040         return;
3041     }
3042
3043     IF_DEBUG(scheduler, 
3044              sched_belch("raising exception in thread %ld.", (long)tso->id));
3045     
3046     // Remove it from any blocking queues
3047     unblockThread(tso);
3048
3049     sp = tso->sp;
3050     
3051     // The stack freezing code assumes there's a closure pointer on
3052     // the top of the stack, so we have to arrange that this is the case...
3053     //
3054     if (sp[0] == (W_)&stg_enter_info) {
3055         sp++;
3056     } else {
3057         sp--;
3058         sp[0] = (W_)&stg_dummy_ret_closure;
3059     }
3060
3061     while (1) {
3062         nat i;
3063
3064         // 1. Let the top of the stack be the "current closure"
3065         //
3066         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3067         // CATCH_FRAME.
3068         //
3069         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3070         // current closure applied to the chunk of stack up to (but not
3071         // including) the update frame.  This closure becomes the "current
3072         // closure".  Go back to step 2.
3073         //
3074         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3075         // top of the stack applied to the exception.
3076         // 
3077         // 5. If it's a STOP_FRAME, then kill the thread.
3078         // 
3079         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3080         // transaction
3081        
3082         
3083         StgPtr frame;
3084         
3085         frame = sp + 1;
3086         info = get_ret_itbl((StgClosure *)frame);
3087         
3088         while (info->i.type != UPDATE_FRAME
3089                && (info->i.type != CATCH_FRAME || exception == NULL)
3090                && info->i.type != STOP_FRAME
3091                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3092         {
3093             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3094               // IF we find an ATOMICALLY_FRAME then we abort the
3095               // current transaction and propagate the exception.  In
3096               // this case (unlike ordinary exceptions) we do not care
3097               // whether the transaction is valid or not because its
3098               // possible validity cannot have caused the exception
3099               // and will not be visible after the abort.
3100               IF_DEBUG(stm,
3101                        debugBelch("Found atomically block delivering async exception\n"));
3102               stmAbortTransaction(tso -> trec);
3103               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3104             }
3105             frame += stack_frame_sizeW((StgClosure *)frame);
3106             info = get_ret_itbl((StgClosure *)frame);
3107         }
3108         
3109         switch (info->i.type) {
3110             
3111         case ATOMICALLY_FRAME:
3112             ASSERT(stop_at_atomically);
3113             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3114             stmCondemnTransaction(tso -> trec);
3115 #ifdef REG_R1
3116             tso->sp = frame;
3117 #else
3118             // R1 is not a register: the return convention for IO in
3119             // this case puts the return value on the stack, so we
3120             // need to set up the stack to return to the atomically
3121             // frame properly...
3122             tso->sp = frame - 2;
3123             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3124             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3125 #endif
3126             tso->what_next = ThreadRunGHC;
3127             return;
3128
3129         case CATCH_FRAME:
3130             // If we find a CATCH_FRAME, and we've got an exception to raise,
3131             // then build the THUNK raise(exception), and leave it on
3132             // top of the CATCH_FRAME ready to enter.
3133             //
3134         {
3135 #ifdef PROFILING
3136             StgCatchFrame *cf = (StgCatchFrame *)frame;
3137 #endif
3138             StgClosure *raise;
3139             
3140             // we've got an exception to raise, so let's pass it to the
3141             // handler in this frame.
3142             //
3143             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3144             TICK_ALLOC_SE_THK(1,0);
3145             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3146             raise->payload[0] = exception;
3147             
3148             // throw away the stack from Sp up to the CATCH_FRAME.
3149             //
3150             sp = frame - 1;
3151             
3152             /* Ensure that async excpetions are blocked now, so we don't get
3153              * a surprise exception before we get around to executing the
3154              * handler.
3155              */
3156             if (tso->blocked_exceptions == NULL) {
3157                 tso->blocked_exceptions = END_TSO_QUEUE;
3158             }
3159             
3160             /* Put the newly-built THUNK on top of the stack, ready to execute
3161              * when the thread restarts.
3162              */
3163             sp[0] = (W_)raise;
3164             sp[-1] = (W_)&stg_enter_info;
3165             tso->sp = sp-1;
3166             tso->what_next = ThreadRunGHC;
3167             IF_DEBUG(sanity, checkTSO(tso));
3168             return;
3169         }
3170         
3171         case UPDATE_FRAME:
3172         {
3173             StgAP_STACK * ap;
3174             nat words;
3175             
3176             // First build an AP_STACK consisting of the stack chunk above the
3177             // current update frame, with the top word on the stack as the
3178             // fun field.
3179             //
3180             words = frame - sp - 1;
3181             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3182             
3183             ap->size = words;
3184             ap->fun  = (StgClosure *)sp[0];
3185             sp++;
3186             for(i=0; i < (nat)words; ++i) {
3187                 ap->payload[i] = (StgClosure *)*sp++;
3188             }
3189             
3190             SET_HDR(ap,&stg_AP_STACK_info,
3191                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3192             TICK_ALLOC_UP_THK(words+1,0);
3193             
3194             IF_DEBUG(scheduler,
3195                      debugBelch("sched: Updating ");
3196                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3197                      debugBelch(" with ");
3198                      printObj((StgClosure *)ap);
3199                 );
3200
3201             // Replace the updatee with an indirection - happily
3202             // this will also wake up any threads currently
3203             // waiting on the result.
3204             //
3205             // Warning: if we're in a loop, more than one update frame on
3206             // the stack may point to the same object.  Be careful not to
3207             // overwrite an IND_OLDGEN in this case, because we'll screw
3208             // up the mutable lists.  To be on the safe side, don't
3209             // overwrite any kind of indirection at all.  See also
3210             // threadSqueezeStack in GC.c, where we have to make a similar
3211             // check.
3212             //
3213             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3214                 // revert the black hole
3215                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3216                                (StgClosure *)ap);
3217             }
3218             sp += sizeofW(StgUpdateFrame) - 1;
3219             sp[0] = (W_)ap; // push onto stack
3220             break;
3221         }
3222         
3223         case STOP_FRAME:
3224             // We've stripped the entire stack, the thread is now dead.
3225             sp += sizeofW(StgStopFrame);
3226             tso->what_next = ThreadKilled;
3227             tso->sp = sp;
3228             return;
3229             
3230         default:
3231             barf("raiseAsync");
3232         }
3233     }
3234     barf("raiseAsync");
3235 }
3236
3237 /* -----------------------------------------------------------------------------
3238    raiseExceptionHelper
3239    
3240    This function is called by the raise# primitve, just so that we can
3241    move some of the tricky bits of raising an exception from C-- into
3242    C.  Who knows, it might be a useful re-useable thing here too.
3243    -------------------------------------------------------------------------- */
3244
3245 StgWord
3246 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3247 {
3248     StgClosure *raise_closure = NULL;
3249     StgPtr p, next;
3250     StgRetInfoTable *info;
3251     //
3252     // This closure represents the expression 'raise# E' where E
3253     // is the exception raise.  It is used to overwrite all the
3254     // thunks which are currently under evaluataion.
3255     //
3256
3257     //    
3258     // LDV profiling: stg_raise_info has THUNK as its closure
3259     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3260     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3261     // 1 does not cause any problem unless profiling is performed.
3262     // However, when LDV profiling goes on, we need to linearly scan
3263     // small object pool, where raise_closure is stored, so we should
3264     // use MIN_UPD_SIZE.
3265     //
3266     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3267     //                                 sizeofW(StgClosure)+1);
3268     //
3269
3270     //
3271     // Walk up the stack, looking for the catch frame.  On the way,
3272     // we update any closures pointed to from update frames with the
3273     // raise closure that we just built.
3274     //
3275     p = tso->sp;
3276     while(1) {
3277         info = get_ret_itbl((StgClosure *)p);
3278         next = p + stack_frame_sizeW((StgClosure *)p);
3279         switch (info->i.type) {
3280             
3281         case UPDATE_FRAME:
3282             // Only create raise_closure if we need to.
3283             if (raise_closure == NULL) {
3284                 raise_closure = 
3285                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3286                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3287                 raise_closure->payload[0] = exception;
3288             }
3289             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3290             p = next;
3291             continue;
3292
3293         case ATOMICALLY_FRAME:
3294             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3295             tso->sp = p;
3296             return ATOMICALLY_FRAME;
3297             
3298         case CATCH_FRAME:
3299             tso->sp = p;
3300             return CATCH_FRAME;
3301
3302         case CATCH_STM_FRAME:
3303             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3304             tso->sp = p;
3305             return CATCH_STM_FRAME;
3306             
3307         case STOP_FRAME:
3308             tso->sp = p;
3309             return STOP_FRAME;
3310
3311         case CATCH_RETRY_FRAME:
3312         default:
3313             p = next; 
3314             continue;
3315         }
3316     }
3317 }
3318
3319
3320 /* -----------------------------------------------------------------------------
3321    findRetryFrameHelper
3322
3323    This function is called by the retry# primitive.  It traverses the stack
3324    leaving tso->sp referring to the frame which should handle the retry.  
3325
3326    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3327    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3328
3329    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3330    despite the similar implementation.
3331
3332    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3333    not be created within memory transactions.
3334    -------------------------------------------------------------------------- */
3335
3336 StgWord
3337 findRetryFrameHelper (StgTSO *tso)
3338 {
3339   StgPtr           p, next;
3340   StgRetInfoTable *info;
3341
3342   p = tso -> sp;
3343   while (1) {
3344     info = get_ret_itbl((StgClosure *)p);
3345     next = p + stack_frame_sizeW((StgClosure *)p);
3346     switch (info->i.type) {
3347       
3348     case ATOMICALLY_FRAME:
3349       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3350       tso->sp = p;
3351       return ATOMICALLY_FRAME;
3352       
3353     case CATCH_RETRY_FRAME:
3354       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3355       tso->sp = p;
3356       return CATCH_RETRY_FRAME;
3357       
3358     case CATCH_STM_FRAME:
3359     default:
3360       ASSERT(info->i.type != CATCH_FRAME);
3361       ASSERT(info->i.type != STOP_FRAME);
3362       p = next; 
3363       continue;
3364     }
3365   }
3366 }
3367
3368 /* -----------------------------------------------------------------------------
3369    resurrectThreads is called after garbage collection on the list of
3370    threads found to be garbage.  Each of these threads will be woken
3371    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3372    on an MVar, or NonTermination if the thread was blocked on a Black
3373    Hole.
3374
3375    Locks: sched_mutex isn't held upon entry nor exit.
3376    -------------------------------------------------------------------------- */
3377
3378 void
3379 resurrectThreads( StgTSO *threads )
3380 {
3381   StgTSO *tso, *next;
3382
3383   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3384     next = tso->global_link;
3385     tso->global_link = all_threads;
3386     all_threads = tso;
3387     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3388
3389     switch (tso->why_blocked) {
3390     case BlockedOnMVar:
3391     case BlockedOnException:
3392       /* Called by GC - sched_mutex lock is currently held. */
3393       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3394       break;
3395     case BlockedOnBlackHole:
3396       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3397       break;
3398     case BlockedOnSTM:
3399       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3400       break;
3401     case NotBlocked:
3402       /* This might happen if the thread was blocked on a black hole
3403        * belonging to a thread that we've just woken up (raiseAsync
3404        * can wake up threads, remember...).
3405        */
3406       continue;
3407     default:
3408       barf("resurrectThreads: thread blocked in a strange way");
3409     }
3410   }
3411 }
3412
3413 /* ----------------------------------------------------------------------------
3414  * Debugging: why is a thread blocked
3415  * [Also provides useful information when debugging threaded programs
3416  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3417    ------------------------------------------------------------------------- */
3418
3419 static
3420 void
3421 printThreadBlockage(StgTSO *tso)
3422 {
3423   switch (tso->why_blocked) {
3424   case BlockedOnRead:
3425     debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3426     break;
3427   case BlockedOnWrite:
3428     debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3429     break;
3430 #if defined(mingw32_TARGET_OS)
3431     case BlockedOnDoProc:
3432     debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3433     break;
3434 #endif
3435   case BlockedOnDelay:
3436     debugBelch("is blocked until %d", tso->block_info.target);
3437     break;
3438   case BlockedOnMVar:
3439     debugBelch("is blocked on an MVar");
3440     break;
3441   case BlockedOnException:
3442     debugBelch("is blocked on delivering an exception to thread %d",
3443             tso->block_info.tso->id);
3444     break;
3445   case BlockedOnBlackHole:
3446     debugBelch("is blocked on a black hole");
3447     break;
3448   case NotBlocked:
3449     debugBelch("is not blocked");
3450     break;
3451 #if defined(PAR)
3452   case BlockedOnGA:
3453     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3454             tso->block_info.closure, info_type(tso->block_info.closure));
3455     break;
3456   case BlockedOnGA_NoSend:
3457     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3458             tso->block_info.closure, info_type(tso->block_info.closure));
3459     break;
3460 #endif
3461   case BlockedOnCCall:
3462     debugBelch("is blocked on an external call");
3463     break;
3464   case BlockedOnCCall_NoUnblockExc:
3465     debugBelch("is blocked on an external call (exceptions were already blocked)");
3466     break;
3467   case BlockedOnSTM:
3468     debugBelch("is blocked on an STM operation");
3469     break;
3470   default:
3471     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3472          tso->why_blocked, tso->id, tso);
3473   }
3474 }
3475
3476 static
3477 void
3478 printThreadStatus(StgTSO *tso)
3479 {
3480   switch (tso->what_next) {
3481   case ThreadKilled:
3482     debugBelch("has been killed");
3483     break;
3484   case ThreadComplete:
3485     debugBelch("has completed");
3486     break;
3487   default:
3488     printThreadBlockage(tso);
3489   }
3490 }
3491
3492 void
3493 printAllThreads(void)
3494 {
3495   StgTSO *t;
3496
3497 # if defined(GRAN)
3498   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3499   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3500                        time_string, rtsFalse/*no commas!*/);
3501
3502   debugBelch("all threads at [%s]:\n", time_string);
3503 # elif defined(PAR)
3504   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3505   ullong_format_string(CURRENT_TIME,
3506                        time_string, rtsFalse/*no commas!*/);
3507
3508   debugBelch("all threads at [%s]:\n", time_string);
3509 # else
3510   debugBelch("all threads:\n");
3511 # endif
3512
3513   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3514     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3515 #if defined(DEBUG)
3516     {
3517       void *label = lookupThreadLabel(t->id);
3518       if (label) debugBelch("[\"%s\"] ",(char *)label);
3519     }
3520 #endif
3521     printThreadStatus(t);
3522     debugBelch("\n");
3523   }
3524 }
3525     
3526 #ifdef DEBUG
3527
3528 /* 
3529    Print a whole blocking queue attached to node (debugging only).
3530 */
3531 # if defined(PAR)
3532 void 
3533 print_bq (StgClosure *node)
3534 {
3535   StgBlockingQueueElement *bqe;
3536   StgTSO *tso;
3537   rtsBool end;
3538
3539   debugBelch("## BQ of closure %p (%s): ",
3540           node, info_type(node));
3541
3542   /* should cover all closures that may have a blocking queue */
3543   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3544          get_itbl(node)->type == FETCH_ME_BQ ||
3545          get_itbl(node)->type == RBH ||
3546          get_itbl(node)->type == MVAR);
3547     
3548   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3549
3550   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3551 }
3552
3553 /* 
3554    Print a whole blocking queue starting with the element bqe.
3555 */
3556 void 
3557 print_bqe (StgBlockingQueueElement *bqe)
3558 {
3559   rtsBool end;
3560
3561   /* 
3562      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3563   */
3564   for (end = (bqe==END_BQ_QUEUE);
3565        !end; // iterate until bqe points to a CONSTR
3566        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3567        bqe = end ? END_BQ_QUEUE : bqe->link) {
3568     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3569     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3570     /* types of closures that may appear in a blocking queue */
3571     ASSERT(get_itbl(bqe)->type == TSO ||           
3572            get_itbl(bqe)->type == BLOCKED_FETCH || 
3573            get_itbl(bqe)->type == CONSTR); 
3574     /* only BQs of an RBH end with an RBH_Save closure */
3575     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3576
3577     switch (get_itbl(bqe)->type) {
3578     case TSO:
3579       debugBelch(" TSO %u (%x),",
3580               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3581       break;
3582     case BLOCKED_FETCH:
3583       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3584               ((StgBlockedFetch *)bqe)->node, 
3585               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3586               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3587               ((StgBlockedFetch *)bqe)->ga.weight);
3588       break;
3589     case CONSTR:
3590       debugBelch(" %s (IP %p),",
3591               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3592                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3593                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3594                "RBH_Save_?"), get_itbl(bqe));
3595       break;
3596     default:
3597       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3598            info_type((StgClosure *)bqe)); // , node, info_type(node));
3599       break;
3600     }
3601   } /* for */
3602   debugBelch("\n");
3603 }
3604 # elif defined(GRAN)
3605 void 
3606 print_bq (StgClosure *node)
3607 {
3608   StgBlockingQueueElement *bqe;
3609   PEs node_loc, tso_loc;
3610   rtsBool end;
3611
3612   /* should cover all closures that may have a blocking queue */
3613   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3614          get_itbl(node)->type == FETCH_ME_BQ ||
3615          get_itbl(node)->type == RBH);
3616     
3617   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3618   node_loc = where_is(node);
3619
3620   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3621           node, info_type(node), node_loc);
3622
3623   /* 
3624      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3625   */
3626   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3627        !end; // iterate until bqe points to a CONSTR
3628        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3629     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3630     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3631     /* types of closures that may appear in a blocking queue */
3632     ASSERT(get_itbl(bqe)->type == TSO ||           
3633            get_itbl(bqe)->type == CONSTR); 
3634     /* only BQs of an RBH end with an RBH_Save closure */
3635     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3636
3637     tso_loc = where_is((StgClosure *)bqe);
3638     switch (get_itbl(bqe)->type) {
3639     case TSO:
3640       debugBelch(" TSO %d (%p) on [PE %d],",
3641               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3642       break;
3643     case CONSTR:
3644       debugBelch(" %s (IP %p),",
3645               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3646                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3647                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3648                "RBH_Save_?"), get_itbl(bqe));
3649       break;
3650     default:
3651       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3652            info_type((StgClosure *)bqe), node, info_type(node));
3653       break;
3654     }
3655   } /* for */
3656   debugBelch("\n");
3657 }
3658 #else
3659 /* 
3660    Nice and easy: only TSOs on the blocking queue
3661 */
3662 void 
3663 print_bq (StgClosure *node)
3664 {
3665   StgTSO *tso;
3666
3667   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3668   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3669        tso != END_TSO_QUEUE; 
3670        tso=tso->link) {
3671     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3672     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3673     debugBelch(" TSO %d (%p),", tso->id, tso);
3674   }
3675   debugBelch("\n");
3676 }
3677 # endif
3678
3679 #if defined(PAR)
3680 static nat
3681 run_queue_len(void)
3682 {
3683   nat i;
3684   StgTSO *tso;
3685
3686   for (i=0, tso=run_queue_hd; 
3687        tso != END_TSO_QUEUE;
3688        i++, tso=tso->link)
3689     /* nothing */
3690
3691   return i;
3692 }
3693 #endif
3694
3695 void
3696 sched_belch(char *s, ...)
3697 {
3698   va_list ap;
3699   va_start(ap,s);
3700 #ifdef RTS_SUPPORTS_THREADS
3701   debugBelch("sched (task %p): ", osThreadId());
3702 #elif defined(PAR)
3703   debugBelch("== ");
3704 #else
3705   debugBelch("sched: ");
3706 #endif
3707   vdebugBelch(s, ap);
3708   debugBelch("\n");
3709   va_end(ap);
3710 }
3711
3712 #endif /* DEBUG */