[project @ 2005-04-05 09:28:32 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PARALLEL_HASKELL          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "OSThreads.h"
46 #include "Storage.h"
47 #include "StgRun.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PARALLEL_HASKELL)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 // Turn off inlining when debugging - it obfuscates things
97 #ifdef DEBUG
98 # undef  STATIC_INLINE
99 # define STATIC_INLINE static
100 #endif
101
102 #ifdef THREADED_RTS
103 #define USED_IN_THREADED_RTS
104 #else
105 #define USED_IN_THREADED_RTS STG_UNUSED
106 #endif
107
108 #ifdef RTS_SUPPORTS_THREADS
109 #define USED_WHEN_RTS_SUPPORTS_THREADS
110 #else
111 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
112 #endif
113
114 /* Main thread queue.
115  * Locks required: sched_mutex.
116  */
117 StgMainThread *main_threads = NULL;
118
119 #if defined(GRAN)
120
121 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
122 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
123
124 /* 
125    In GranSim we have a runnable and a blocked queue for each processor.
126    In order to minimise code changes new arrays run_queue_hds/tls
127    are created. run_queue_hd is then a short cut (macro) for
128    run_queue_hds[CurrentProc] (see GranSim.h).
129    -- HWL
130 */
131 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
132 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
133 StgTSO *ccalling_threadss[MAX_PROC];
134 /* We use the same global list of threads (all_threads) in GranSim as in
135    the std RTS (i.e. we are cheating). However, we don't use this list in
136    the GranSim specific code at the moment (so we are only potentially
137    cheating).  */
138
139 #else /* !GRAN */
140
141 /* Thread queues.
142  * Locks required: sched_mutex.
143  */
144 StgTSO *run_queue_hd = NULL;
145 StgTSO *run_queue_tl = NULL;
146 StgTSO *blocked_queue_hd = NULL;
147 StgTSO *blocked_queue_tl = NULL;
148 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
149
150 #endif
151
152 /* Linked list of all threads.
153  * Used for detecting garbage collected threads.
154  */
155 StgTSO *all_threads = NULL;
156
157 /* When a thread performs a safe C call (_ccall_GC, using old
158  * terminology), it gets put on the suspended_ccalling_threads
159  * list. Used by the garbage collector.
160  */
161 static StgTSO *suspended_ccalling_threads;
162
163 /* KH: The following two flags are shared memory locations.  There is no need
164        to lock them, since they are only unset at the end of a scheduler
165        operation.
166 */
167
168 /* flag set by signal handler to precipitate a context switch */
169 int context_switch = 0;
170
171 /* if this flag is set as well, give up execution */
172 rtsBool interrupted = rtsFalse;
173
174 /* If this flag is set, we are running Haskell code.  Used to detect
175  * uses of 'foreign import unsafe' that should be 'safe'.
176  */
177 static rtsBool in_haskell = rtsFalse;
178
179 /* Next thread ID to allocate.
180  * Locks required: thread_id_mutex
181  */
182 static StgThreadID next_thread_id = 1;
183
184 /*
185  * Pointers to the state of the current thread.
186  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
187  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
188  */
189  
190 /* The smallest stack size that makes any sense is:
191  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
192  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
193  *  + 1                       (the closure to enter)
194  *  + 1                       (stg_ap_v_ret)
195  *  + 1                       (spare slot req'd by stg_ap_v_ret)
196  *
197  * A thread with this stack will bomb immediately with a stack
198  * overflow, which will increase its stack size.  
199  */
200
201 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
202
203
204 #if defined(GRAN)
205 StgTSO *CurrentTSO;
206 #endif
207
208 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
209  *  exists - earlier gccs apparently didn't.
210  *  -= chak
211  */
212 StgTSO dummy_tso;
213
214 # if defined(SMP)
215 static Condition gc_pending_cond = INIT_COND_VAR;
216 # endif
217
218 static rtsBool ready_to_gc;
219
220 /*
221  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
222  * in an MT setting, needed to signal that a worker thread shouldn't hang around
223  * in the scheduler when it is out of work.
224  */
225 static rtsBool shutting_down_scheduler = rtsFalse;
226
227 #if defined(RTS_SUPPORTS_THREADS)
228 /* ToDo: carefully document the invariants that go together
229  *       with these synchronisation objects.
230  */
231 Mutex     sched_mutex       = INIT_MUTEX_VAR;
232 Mutex     term_mutex        = INIT_MUTEX_VAR;
233
234 #endif /* RTS_SUPPORTS_THREADS */
235
236 #if defined(PARALLEL_HASKELL)
237 StgTSO *LastTSO;
238 rtsTime TimeOfLastYield;
239 rtsBool emitSchedule = rtsTrue;
240 #endif
241
242 #if DEBUG
243 static char *whatNext_strs[] = {
244   "(unknown)",
245   "ThreadRunGHC",
246   "ThreadInterpret",
247   "ThreadKilled",
248   "ThreadRelocated",
249   "ThreadComplete"
250 };
251 #endif
252
253 /* -----------------------------------------------------------------------------
254  * static function prototypes
255  * -------------------------------------------------------------------------- */
256
257 #if defined(RTS_SUPPORTS_THREADS)
258 static void taskStart(void);
259 #endif
260
261 static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
262                       Capability *initialCapability );
263
264 //
265 // These function all encapsulate parts of the scheduler loop, and are
266 // abstracted only to make the structure and control flow of the
267 // scheduler clearer.
268 //
269 static void schedulePreLoop(void);
270 static void scheduleHandleInterrupt(void);
271 static void scheduleStartSignalHandlers(void);
272 static void scheduleCheckBlockedThreads(void);
273 static void scheduleDetectDeadlock(void);
274 #if defined(GRAN)
275 static StgTSO *scheduleProcessEvent(rtsEvent *event);
276 #endif
277 #if defined(PARALLEL_HASKELL)
278 static StgTSO *scheduleSendPendingMessages(void);
279 static void scheduleActivateSpark(void);
280 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
281 #endif
282 #if defined(PAR) || defined(GRAN)
283 static void scheduleGranParReport(void);
284 #endif
285 static void schedulePostRunThread(void);
286 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
287 static void scheduleHandleStackOverflow( StgTSO *t);
288 static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
289 static void scheduleHandleThreadBlocked( StgTSO *t );
290 static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, 
291                                              Capability *cap, StgTSO *t );
292 static void scheduleDoHeapProfile(void);
293 static void scheduleDoGC(void);
294
295 static void unblockThread(StgTSO *tso);
296 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
297                                    Capability *initialCapability
298                                    );
299 static void scheduleThread_ (StgTSO* tso);
300 static void AllRoots(evac_fn evac);
301
302 static StgTSO *threadStackOverflow(StgTSO *tso);
303
304 static void raiseAsync_(StgTSO *tso, StgClosure *exception, 
305                         rtsBool stop_at_atomically);
306
307 static void printThreadBlockage(StgTSO *tso);
308 static void printThreadStatus(StgTSO *tso);
309
310 #if defined(PARALLEL_HASKELL)
311 StgTSO * createSparkThread(rtsSpark spark);
312 StgTSO * activateSpark (rtsSpark spark);  
313 #endif
314
315 /* ----------------------------------------------------------------------------
316  * Starting Tasks
317  * ------------------------------------------------------------------------- */
318
319 #if defined(RTS_SUPPORTS_THREADS)
320 static rtsBool startingWorkerThread = rtsFalse;
321
322 static void
323 taskStart(void)
324 {
325   ACQUIRE_LOCK(&sched_mutex);
326   startingWorkerThread = rtsFalse;
327   schedule(NULL,NULL);
328   RELEASE_LOCK(&sched_mutex);
329 }
330
331 void
332 startSchedulerTaskIfNecessary(void)
333 {
334   if(run_queue_hd != END_TSO_QUEUE
335     || blocked_queue_hd != END_TSO_QUEUE
336     || sleeping_queue != END_TSO_QUEUE)
337   {
338     if(!startingWorkerThread)
339     { // we don't want to start another worker thread
340       // just because the last one hasn't yet reached the
341       // "waiting for capability" state
342       startingWorkerThread = rtsTrue;
343       if (!startTask(taskStart)) {
344           startingWorkerThread = rtsFalse;
345       }
346     }
347   }
348 }
349 #endif
350
351 /* -----------------------------------------------------------------------------
352  * Putting a thread on the run queue: different scheduling policies
353  * -------------------------------------------------------------------------- */
354
355 STATIC_INLINE void
356 addToRunQueue( StgTSO *t )
357 {
358 #if defined(PARALLEL_HASKELL)
359     if (RtsFlags.ParFlags.doFairScheduling) { 
360         // this does round-robin scheduling; good for concurrency
361         APPEND_TO_RUN_QUEUE(t);
362     } else {
363         // this does unfair scheduling; good for parallelism
364         PUSH_ON_RUN_QUEUE(t);
365     }
366 #else
367     // this does round-robin scheduling; good for concurrency
368     APPEND_TO_RUN_QUEUE(t);
369 #endif
370 }
371     
372 /* ---------------------------------------------------------------------------
373    Main scheduling loop.
374
375    We use round-robin scheduling, each thread returning to the
376    scheduler loop when one of these conditions is detected:
377
378       * out of heap space
379       * timer expires (thread yields)
380       * thread blocks
381       * thread ends
382       * stack overflow
383
384    Locking notes:  we acquire the scheduler lock once at the beginning
385    of the scheduler loop, and release it when
386     
387       * running a thread, or
388       * waiting for work, or
389       * waiting for a GC to complete.
390
391    GRAN version:
392      In a GranSim setup this loop iterates over the global event queue.
393      This revolves around the global event queue, which determines what 
394      to do next. Therefore, it's more complicated than either the 
395      concurrent or the parallel (GUM) setup.
396
397    GUM version:
398      GUM iterates over incoming messages.
399      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
400      and sends out a fish whenever it has nothing to do; in-between
401      doing the actual reductions (shared code below) it processes the
402      incoming messages and deals with delayed operations 
403      (see PendingFetches).
404      This is not the ugliest code you could imagine, but it's bloody close.
405
406    ------------------------------------------------------------------------ */
407
408 static void
409 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
410           Capability *initialCapability )
411 {
412   StgTSO *t;
413   Capability *cap;
414   StgThreadReturnCode ret;
415 #if defined(GRAN)
416   rtsEvent *event;
417 #elif defined(PARALLEL_HASKELL)
418   StgTSO *tso;
419   GlobalTaskId pe;
420   rtsBool receivedFinish = rtsFalse;
421 # if defined(DEBUG)
422   nat tp_size, sp_size; // stats only
423 # endif
424 #endif
425   nat prev_what_next;
426   
427   // Pre-condition: sched_mutex is held.
428   // We might have a capability, passed in as initialCapability.
429   cap = initialCapability;
430
431 #if !defined(RTS_SUPPORTS_THREADS)
432   // simply initialise it in the non-threaded case
433   grabCapability(&cap);
434 #endif
435
436   IF_DEBUG(scheduler,
437            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
438                        mainThread, initialCapability);
439       );
440
441   schedulePreLoop();
442
443   // -----------------------------------------------------------
444   // Scheduler loop starts here:
445
446 #if defined(PARALLEL_HASKELL)
447 #define TERMINATION_CONDITION        (!receivedFinish)
448 #elif defined(GRAN)
449 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
450 #else
451 #define TERMINATION_CONDITION        rtsTrue
452 #endif
453
454   while (TERMINATION_CONDITION) {
455
456 #if defined(GRAN)
457       /* Choose the processor with the next event */
458       CurrentProc = event->proc;
459       CurrentTSO = event->tso;
460 #endif
461
462       IF_DEBUG(scheduler, printAllThreads());
463
464 #if defined(SMP)
465       // 
466       // Wait until GC has completed, if necessary.
467       //
468       if (ready_to_gc) {
469           if (cap != NULL) {
470               releaseCapability(cap);
471               IF_DEBUG(scheduler,sched_belch("waiting for GC"));
472               waitCondition( &gc_pending_cond, &sched_mutex );
473           }
474       }
475 #endif
476
477 #if defined(RTS_SUPPORTS_THREADS)
478       // Yield the capability to higher-priority tasks if necessary.
479       //
480       if (cap != NULL) {
481           yieldCapability(&cap);
482       }
483
484       // If we do not currently hold a capability, we wait for one
485       //
486       if (cap == NULL) {
487           waitForCapability(&sched_mutex, &cap,
488                             mainThread ? &mainThread->bound_thread_cond : NULL);
489       }
490
491       // We now have a capability...
492 #endif
493
494     // Check whether we have re-entered the RTS from Haskell without
495     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
496     // call).
497     if (in_haskell) {
498           errorBelch("schedule: re-entered unsafely.\n"
499                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
500           stg_exit(1);
501     }
502
503     scheduleHandleInterrupt();
504
505 #if defined(not_yet) && defined(SMP)
506     //
507     // Top up the run queue from our spark pool.  We try to make the
508     // number of threads in the run queue equal to the number of
509     // free capabilities.
510     //
511     {
512         StgClosure *spark;
513         if (EMPTY_RUN_QUEUE()) {
514             spark = findSpark(rtsFalse);
515             if (spark == NULL) {
516                 break; /* no more sparks in the pool */
517             } else {
518                 createSparkThread(spark);         
519                 IF_DEBUG(scheduler,
520                          sched_belch("==^^ turning spark of closure %p into a thread",
521                                      (StgClosure *)spark));
522             }
523         }
524     }
525 #endif // SMP
526
527     scheduleStartSignalHandlers();
528
529     scheduleCheckBlockedThreads();
530
531     scheduleDetectDeadlock();
532
533     // Normally, the only way we can get here with no threads to
534     // run is if a keyboard interrupt received during 
535     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
536     // Additionally, it is not fatal for the
537     // threaded RTS to reach here with no threads to run.
538     //
539     // win32: might be here due to awaitEvent() being abandoned
540     // as a result of a console event having been delivered.
541     if ( EMPTY_RUN_QUEUE() ) {
542 #if !defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_HOST_OS)
543         ASSERT(interrupted);
544 #endif
545         continue; // nothing to do
546     }
547
548 #if defined(PARALLEL_HASKELL)
549     scheduleSendPendingMessages();
550     if (EMPTY_RUN_QUEUE() && scheduleActivateSpark()) 
551         continue;
552
553 #if defined(SPARKS)
554     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
555 #endif
556
557     /* If we still have no work we need to send a FISH to get a spark
558        from another PE */
559     if (EMPTY_RUN_QUEUE()) {
560         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
561         ASSERT(rtsFalse); // should not happen at the moment
562     }
563     // from here: non-empty run queue.
564     //  TODO: merge above case with this, only one call processMessages() !
565     if (PacketsWaiting()) {  /* process incoming messages, if
566                                 any pending...  only in else
567                                 because getRemoteWork waits for
568                                 messages as well */
569         receivedFinish = processMessages();
570     }
571 #endif
572
573 #if defined(GRAN)
574     scheduleProcessEvent(event);
575 #endif
576
577     // 
578     // Get a thread to run
579     //
580     ASSERT(run_queue_hd != END_TSO_QUEUE);
581     POP_RUN_QUEUE(t);
582
583 #if defined(GRAN) || defined(PAR)
584     scheduleGranParReport(); // some kind of debuging output
585 #else
586     // Sanity check the thread we're about to run.  This can be
587     // expensive if there is lots of thread switching going on...
588     IF_DEBUG(sanity,checkTSO(t));
589 #endif
590
591 #if defined(RTS_SUPPORTS_THREADS)
592     // Check whether we can run this thread in the current task.
593     // If not, we have to pass our capability to the right task.
594     {
595       StgMainThread *m = t->main;
596       
597       if(m)
598       {
599         if(m == mainThread)
600         {
601           IF_DEBUG(scheduler,
602             sched_belch("### Running thread %d in bound thread", t->id));
603           // yes, the Haskell thread is bound to the current native thread
604         }
605         else
606         {
607           IF_DEBUG(scheduler,
608             sched_belch("### thread %d bound to another OS thread", t->id));
609           // no, bound to a different Haskell thread: pass to that thread
610           PUSH_ON_RUN_QUEUE(t);
611           passCapability(&m->bound_thread_cond);
612           continue;
613         }
614       }
615       else
616       {
617         if(mainThread != NULL)
618         // The thread we want to run is bound.
619         {
620           IF_DEBUG(scheduler,
621             sched_belch("### this OS thread cannot run thread %d", t->id));
622           // no, the current native thread is bound to a different
623           // Haskell thread, so pass it to any worker thread
624           PUSH_ON_RUN_QUEUE(t);
625           passCapabilityToWorker();
626           continue; 
627         }
628       }
629     }
630 #endif
631
632     cap->r.rCurrentTSO = t;
633     
634     /* context switches are now initiated by the timer signal, unless
635      * the user specified "context switch as often as possible", with
636      * +RTS -C0
637      */
638     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
639          && (run_queue_hd != END_TSO_QUEUE
640              || blocked_queue_hd != END_TSO_QUEUE
641              || sleeping_queue != END_TSO_QUEUE)))
642         context_switch = 1;
643
644 run_thread:
645
646     RELEASE_LOCK(&sched_mutex);
647
648     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
649                               (long)t->id, whatNext_strs[t->what_next]));
650
651 #if defined(PROFILING)
652     startHeapProfTimer();
653 #endif
654
655     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
656     /* Run the current thread 
657      */
658     prev_what_next = t->what_next;
659
660     errno = t->saved_errno;
661     in_haskell = rtsTrue;
662
663     switch (prev_what_next) {
664
665     case ThreadKilled:
666     case ThreadComplete:
667         /* Thread already finished, return to scheduler. */
668         ret = ThreadFinished;
669         break;
670
671     case ThreadRunGHC:
672         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
673         break;
674
675     case ThreadInterpret:
676         ret = interpretBCO(cap);
677         break;
678
679     default:
680       barf("schedule: invalid what_next field");
681     }
682
683     in_haskell = rtsFalse;
684
685     // The TSO might have moved, eg. if it re-entered the RTS and a GC
686     // happened.  So find the new location:
687     t = cap->r.rCurrentTSO;
688
689     // And save the current errno in this thread.
690     t->saved_errno = errno;
691
692     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
693     
694     /* Costs for the scheduler are assigned to CCS_SYSTEM */
695 #if defined(PROFILING)
696     stopHeapProfTimer();
697     CCCS = CCS_SYSTEM;
698 #endif
699     
700     ACQUIRE_LOCK(&sched_mutex);
701     
702 #if defined(RTS_SUPPORTS_THREADS)
703     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
704 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
705     IF_DEBUG(scheduler,debugBelch("sched: "););
706 #endif
707     
708     schedulePostRunThread();
709
710     switch (ret) {
711     case HeapOverflow:
712         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
713         break;
714
715     case StackOverflow:
716         scheduleHandleStackOverflow(t);
717         break;
718
719     case ThreadYielding:
720         if (scheduleHandleYield(t, prev_what_next)) {
721             // shortcut for switching between compiler/interpreter:
722             goto run_thread; 
723         }
724         break;
725
726     case ThreadBlocked:
727         scheduleHandleThreadBlocked(t);
728         threadPaused(t);
729         break;
730
731     case ThreadFinished:
732         if (scheduleHandleThreadFinished(mainThread, cap, t)) return;;
733         break;
734
735     default:
736       barf("schedule: invalid thread return code %d", (int)ret);
737     }
738
739     scheduleDoHeapProfile();
740     scheduleDoGC();
741   } /* end of while() */
742
743   IF_PAR_DEBUG(verbose,
744                debugBelch("== Leaving schedule() after having received Finish\n"));
745 }
746
747 /* ----------------------------------------------------------------------------
748  * Setting up the scheduler loop
749  * ASSUMES: sched_mutex
750  * ------------------------------------------------------------------------- */
751
752 static void
753 schedulePreLoop(void)
754 {
755 #if defined(GRAN) 
756     /* set up first event to get things going */
757     /* ToDo: assign costs for system setup and init MainTSO ! */
758     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
759               ContinueThread, 
760               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
761     
762     IF_DEBUG(gran,
763              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
764                         CurrentTSO);
765              G_TSO(CurrentTSO, 5));
766     
767     if (RtsFlags.GranFlags.Light) {
768         /* Save current time; GranSim Light only */
769         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
770     }      
771 #endif
772 }
773
774 /* ----------------------------------------------------------------------------
775  * Deal with the interrupt flag
776  * ASSUMES: sched_mutex
777  * ------------------------------------------------------------------------- */
778
779 static
780 void scheduleHandleInterrupt(void)
781 {
782     //
783     // Test for interruption.  If interrupted==rtsTrue, then either
784     // we received a keyboard interrupt (^C), or the scheduler is
785     // trying to shut down all the tasks (shutting_down_scheduler) in
786     // the threaded RTS.
787     //
788     if (interrupted) {
789         if (shutting_down_scheduler) {
790             IF_DEBUG(scheduler, sched_belch("shutting down"));
791 #if defined(RTS_SUPPORTS_THREADS)
792             shutdownThread();
793 #endif
794         } else {
795             IF_DEBUG(scheduler, sched_belch("interrupted"));
796             deleteAllThreads();
797         }
798     }
799 }
800
801 /* ----------------------------------------------------------------------------
802  * Start any pending signal handlers
803  * ASSUMES: sched_mutex
804  * ------------------------------------------------------------------------- */
805
806 static void
807 scheduleStartSignalHandlers(void)
808 {
809 #if defined(RTS_USER_SIGNALS)
810     if (signals_pending()) {
811       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
812       startSignalHandlers();
813       ACQUIRE_LOCK(&sched_mutex);
814     }
815 #endif
816 }
817
818 /* ----------------------------------------------------------------------------
819  * Check for blocked threads that can be woken up.
820  * ASSUMES: sched_mutex
821  * ------------------------------------------------------------------------- */
822
823 static void
824 scheduleCheckBlockedThreads(void)
825 {
826     //
827     // Check whether any waiting threads need to be woken up.  If the
828     // run queue is empty, and there are no other tasks running, we
829     // can wait indefinitely for something to happen.
830     //
831     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
832     {
833 #if defined(RTS_SUPPORTS_THREADS)
834         // We shouldn't be here...
835         barf("schedule: awaitEvent() in threaded RTS");
836 #endif
837         awaitEvent( EMPTY_RUN_QUEUE() );
838     }
839 }
840
841 /* ----------------------------------------------------------------------------
842  * Detect deadlock conditions and attempt to resolve them.
843  * ASSUMES: sched_mutex
844  * ------------------------------------------------------------------------- */
845
846 static void
847 scheduleDetectDeadlock(void)
848 {
849     /* 
850      * Detect deadlock: when we have no threads to run, there are no
851      * threads waiting on I/O or sleeping, and all the other tasks are
852      * waiting for work, we must have a deadlock of some description.
853      *
854      * We first try to find threads blocked on themselves (ie. black
855      * holes), and generate NonTermination exceptions where necessary.
856      *
857      * If no threads are black holed, we have a deadlock situation, so
858      * inform all the main threads.
859      */
860 #if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS)
861     if ( EMPTY_THREAD_QUEUES() )
862     {
863         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
864
865         // Garbage collection can release some new threads due to
866         // either (a) finalizers or (b) threads resurrected because
867         // they are unreachable and will therefore be sent an
868         // exception.  Any threads thus released will be immediately
869         // runnable.
870         GarbageCollect(GetRoots,rtsTrue);
871         if ( !EMPTY_RUN_QUEUE() ) return;
872
873 #if defined(RTS_USER_SIGNALS)
874         /* If we have user-installed signal handlers, then wait
875          * for signals to arrive rather then bombing out with a
876          * deadlock.
877          */
878         if ( anyUserHandlers() ) {
879             IF_DEBUG(scheduler, 
880                      sched_belch("still deadlocked, waiting for signals..."));
881
882             awaitUserSignals();
883
884             if (signals_pending()) {
885                 RELEASE_LOCK(&sched_mutex);
886                 startSignalHandlers();
887                 ACQUIRE_LOCK(&sched_mutex);
888             }
889
890             // either we have threads to run, or we were interrupted:
891             ASSERT(!EMPTY_RUN_QUEUE() || interrupted);
892         }
893 #endif
894
895         /* Probably a real deadlock.  Send the current main thread the
896          * Deadlock exception (or in the SMP build, send *all* main
897          * threads the deadlock exception, since none of them can make
898          * progress).
899          */
900         {
901             StgMainThread *m;
902             m = main_threads;
903             switch (m->tso->why_blocked) {
904             case BlockedOnBlackHole:
905             case BlockedOnException:
906             case BlockedOnMVar:
907                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
908                 return;
909             default:
910                 barf("deadlock: main thread blocked in a strange way");
911             }
912         }
913     }
914
915 #elif defined(RTS_SUPPORTS_THREADS)
916     // ToDo: add deadlock detection in threaded RTS
917 #elif defined(PARALLEL_HASKELL)
918     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
919 #endif
920 }
921
922 /* ----------------------------------------------------------------------------
923  * Process an event (GRAN only)
924  * ------------------------------------------------------------------------- */
925
926 #if defined(GRAN)
927 static StgTSO *
928 scheduleProcessEvent(rtsEvent *event)
929 {
930     StgTSO *t;
931
932     if (RtsFlags.GranFlags.Light)
933       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
934
935     /* adjust time based on time-stamp */
936     if (event->time > CurrentTime[CurrentProc] &&
937         event->evttype != ContinueThread)
938       CurrentTime[CurrentProc] = event->time;
939     
940     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
941     if (!RtsFlags.GranFlags.Light)
942       handleIdlePEs();
943
944     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
945
946     /* main event dispatcher in GranSim */
947     switch (event->evttype) {
948       /* Should just be continuing execution */
949     case ContinueThread:
950       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
951       /* ToDo: check assertion
952       ASSERT(run_queue_hd != (StgTSO*)NULL &&
953              run_queue_hd != END_TSO_QUEUE);
954       */
955       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
956       if (!RtsFlags.GranFlags.DoAsyncFetch &&
957           procStatus[CurrentProc]==Fetching) {
958         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
959               CurrentTSO->id, CurrentTSO, CurrentProc);
960         goto next_thread;
961       } 
962       /* Ignore ContinueThreads for completed threads */
963       if (CurrentTSO->what_next == ThreadComplete) {
964         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
965               CurrentTSO->id, CurrentTSO, CurrentProc);
966         goto next_thread;
967       } 
968       /* Ignore ContinueThreads for threads that are being migrated */
969       if (PROCS(CurrentTSO)==Nowhere) { 
970         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
971               CurrentTSO->id, CurrentTSO, CurrentProc);
972         goto next_thread;
973       }
974       /* The thread should be at the beginning of the run queue */
975       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
976         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
977               CurrentTSO->id, CurrentTSO, CurrentProc);
978         break; // run the thread anyway
979       }
980       /*
981       new_event(proc, proc, CurrentTime[proc],
982                 FindWork,
983                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
984       goto next_thread; 
985       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
986       break; // now actually run the thread; DaH Qu'vam yImuHbej 
987
988     case FetchNode:
989       do_the_fetchnode(event);
990       goto next_thread;             /* handle next event in event queue  */
991       
992     case GlobalBlock:
993       do_the_globalblock(event);
994       goto next_thread;             /* handle next event in event queue  */
995       
996     case FetchReply:
997       do_the_fetchreply(event);
998       goto next_thread;             /* handle next event in event queue  */
999       
1000     case UnblockThread:   /* Move from the blocked queue to the tail of */
1001       do_the_unblock(event);
1002       goto next_thread;             /* handle next event in event queue  */
1003       
1004     case ResumeThread:  /* Move from the blocked queue to the tail of */
1005       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1006       event->tso->gran.blocktime += 
1007         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1008       do_the_startthread(event);
1009       goto next_thread;             /* handle next event in event queue  */
1010       
1011     case StartThread:
1012       do_the_startthread(event);
1013       goto next_thread;             /* handle next event in event queue  */
1014       
1015     case MoveThread:
1016       do_the_movethread(event);
1017       goto next_thread;             /* handle next event in event queue  */
1018       
1019     case MoveSpark:
1020       do_the_movespark(event);
1021       goto next_thread;             /* handle next event in event queue  */
1022       
1023     case FindWork:
1024       do_the_findwork(event);
1025       goto next_thread;             /* handle next event in event queue  */
1026       
1027     default:
1028       barf("Illegal event type %u\n", event->evttype);
1029     }  /* switch */
1030     
1031     /* This point was scheduler_loop in the old RTS */
1032
1033     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1034
1035     TimeOfLastEvent = CurrentTime[CurrentProc];
1036     TimeOfNextEvent = get_time_of_next_event();
1037     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1038     // CurrentTSO = ThreadQueueHd;
1039
1040     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1041                          TimeOfNextEvent));
1042
1043     if (RtsFlags.GranFlags.Light) 
1044       GranSimLight_leave_system(event, &ActiveTSO); 
1045
1046     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1047
1048     IF_DEBUG(gran, 
1049              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1050
1051     /* in a GranSim setup the TSO stays on the run queue */
1052     t = CurrentTSO;
1053     /* Take a thread from the run queue. */
1054     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1055
1056     IF_DEBUG(gran, 
1057              debugBelch("GRAN: About to run current thread, which is\n");
1058              G_TSO(t,5));
1059
1060     context_switch = 0; // turned on via GranYield, checking events and time slice
1061
1062     IF_DEBUG(gran, 
1063              DumpGranEvent(GR_SCHEDULE, t));
1064
1065     procStatus[CurrentProc] = Busy;
1066 }
1067 #endif // GRAN
1068
1069 /* ----------------------------------------------------------------------------
1070  * Send pending messages (PARALLEL_HASKELL only)
1071  * ------------------------------------------------------------------------- */
1072
1073 #if defined(PARALLEL_HASKELL)
1074 static StgTSO *
1075 scheduleSendPendingMessages(void)
1076 {
1077     StgSparkPool *pool;
1078     rtsSpark spark;
1079     StgTSO *t;
1080
1081 # if defined(PAR) // global Mem.Mgmt., omit for now
1082     if (PendingFetches != END_BF_QUEUE) {
1083         processFetches();
1084     }
1085 # endif
1086     
1087     if (RtsFlags.ParFlags.BufferTime) {
1088         // if we use message buffering, we must send away all message
1089         // packets which have become too old...
1090         sendOldBuffers(); 
1091     }
1092 }
1093 #endif
1094
1095 /* ----------------------------------------------------------------------------
1096  * Activate spark threads (PARALLEL_HASKELL only)
1097  * ------------------------------------------------------------------------- */
1098
1099 #if defined(PARALLEL_HASKELL)
1100 static void
1101 scheduleActivateSpark(void)
1102 {
1103 #if defined(SPARKS)
1104   ASSERT(EMPTY_RUN_QUEUE());
1105 /* We get here if the run queue is empty and want some work.
1106    We try to turn a spark into a thread, and add it to the run queue,
1107    from where it will be picked up in the next iteration of the scheduler
1108    loop.
1109 */
1110
1111       /* :-[  no local threads => look out for local sparks */
1112       /* the spark pool for the current PE */
1113       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1114       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1115           pool->hd < pool->tl) {
1116         /* 
1117          * ToDo: add GC code check that we really have enough heap afterwards!!
1118          * Old comment:
1119          * If we're here (no runnable threads) and we have pending
1120          * sparks, we must have a space problem.  Get enough space
1121          * to turn one of those pending sparks into a
1122          * thread... 
1123          */
1124
1125         spark = findSpark(rtsFalse);            /* get a spark */
1126         if (spark != (rtsSpark) NULL) {
1127           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1128           IF_PAR_DEBUG(fish, // schedule,
1129                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1130                              tso->id, tso, advisory_thread_count));
1131
1132           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1133             IF_PAR_DEBUG(fish, // schedule,
1134                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1135                             spark));
1136             return rtsFalse; /* failed to generate a thread */
1137           }                  /* otherwise fall through & pick-up new tso */
1138         } else {
1139           IF_PAR_DEBUG(fish, // schedule,
1140                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1141                              spark_queue_len(pool)));
1142           return rtsFalse;  /* failed to generate a thread */
1143         }
1144         return rtsTrue;  /* success in generating a thread */
1145   } else { /* no more threads permitted or pool empty */
1146     return rtsFalse;  /* failed to generateThread */
1147   }
1148 #else
1149   tso = NULL; // avoid compiler warning only
1150   return rtsFalse;  /* dummy in non-PAR setup */
1151 #endif // SPARKS
1152 }
1153 #endif // PARALLEL_HASKELL
1154
1155 /* ----------------------------------------------------------------------------
1156  * Get work from a remote node (PARALLEL_HASKELL only)
1157  * ------------------------------------------------------------------------- */
1158     
1159 #if defined(PARALLEL_HASKELL)
1160 static rtsBool
1161 scheduleGetRemoteWork(rtsBool *receivedFinish)
1162 {
1163   ASSERT(EMPTY_RUN_QUEUE());
1164
1165   if (RtsFlags.ParFlags.BufferTime) {
1166         IF_PAR_DEBUG(verbose, 
1167                 debugBelch("...send all pending data,"));
1168         {
1169           nat i;
1170           for (i=1; i<=nPEs; i++)
1171             sendImmediately(i); // send all messages away immediately
1172         }
1173   }
1174 # ifndef SPARKS
1175         //++EDEN++ idle() , i.e. send all buffers, wait for work
1176         // suppress fishing in EDEN... just look for incoming messages
1177         // (blocking receive)
1178   IF_PAR_DEBUG(verbose, 
1179                debugBelch("...wait for incoming messages...\n"));
1180   *receivedFinish = processMessages(); // blocking receive...
1181
1182         // and reenter scheduling loop after having received something
1183         // (return rtsFalse below)
1184
1185 # else /* activate SPARKS machinery */
1186 /* We get here, if we have no work, tried to activate a local spark, but still
1187    have no work. We try to get a remote spark, by sending a FISH message.
1188    Thread migration should be added here, and triggered when a sequence of 
1189    fishes returns without work. */
1190         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1191
1192       /* =8-[  no local sparks => look for work on other PEs */
1193         /*
1194          * We really have absolutely no work.  Send out a fish
1195          * (there may be some out there already), and wait for
1196          * something to arrive.  We clearly can't run any threads
1197          * until a SCHEDULE or RESUME arrives, and so that's what
1198          * we're hoping to see.  (Of course, we still have to
1199          * respond to other types of messages.)
1200          */
1201         rtsTime now = msTime() /*CURRENT_TIME*/;
1202         IF_PAR_DEBUG(verbose, 
1203                      debugBelch("--  now=%ld\n", now));
1204         IF_PAR_DEBUG(fish, // verbose,
1205              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1206                  (last_fish_arrived_at!=0 &&
1207                   last_fish_arrived_at+delay > now)) {
1208                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1209                      now, last_fish_arrived_at+delay, 
1210                      last_fish_arrived_at,
1211                      delay);
1212              });
1213   
1214         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1215             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1216           if (last_fish_arrived_at==0 ||
1217               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1218             /* outstandingFishes is set in sendFish, processFish;
1219                avoid flooding system with fishes via delay */
1220     next_fish_to_send_at = 0;  
1221   } else {
1222     /* ToDo: this should be done in the main scheduling loop to avoid the
1223              busy wait here; not so bad if fish delay is very small  */
1224     int iq = 0; // DEBUGGING -- HWL
1225     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1226     /* send a fish when ready, but process messages that arrive in the meantime */
1227     do {
1228       if (PacketsWaiting()) {
1229         iq++; // DEBUGGING
1230         *receivedFinish = processMessages();
1231       }
1232       now = msTime();
1233     } while (!*receivedFinish || now<next_fish_to_send_at);
1234     // JB: This means the fish could become obsolete, if we receive
1235     // work. Better check for work again? 
1236     // last line: while (!receivedFinish || !haveWork || now<...)
1237     // next line: if (receivedFinish || haveWork )
1238
1239     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1240       return rtsFalse;  // NB: this will leave scheduler loop
1241                         // immediately after return!
1242                           
1243     IF_PAR_DEBUG(fish, // verbose,
1244                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1245
1246   }
1247
1248     // JB: IMHO, this should all be hidden inside sendFish(...)
1249     /* pe = choosePE(); 
1250        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1251                 NEW_FISH_HUNGER);
1252
1253     // Global statistics: count no. of fishes
1254     if (RtsFlags.ParFlags.ParStats.Global &&
1255          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1256            globalParStats.tot_fish_mess++;
1257            }
1258     */ 
1259
1260   /* delayed fishes must have been sent by now! */
1261   next_fish_to_send_at = 0;  
1262   }
1263       
1264   *receivedFinish = processMessages();
1265 # endif /* SPARKS */
1266
1267  return rtsFalse;
1268  /* NB: this function always returns rtsFalse, meaning the scheduler
1269     loop continues with the next iteration; 
1270     rationale: 
1271       return code means success in finding work; we enter this function
1272       if there is no local work, thus have to send a fish which takes
1273       time until it arrives with work; in the meantime we should process
1274       messages in the main loop;
1275  */
1276 }
1277 #endif // PARALLEL_HASKELL
1278
1279 /* ----------------------------------------------------------------------------
1280  * PAR/GRAN: Report stats & debugging info(?)
1281  * ------------------------------------------------------------------------- */
1282
1283 #if defined(PAR) || defined(GRAN)
1284 static void
1285 scheduleGranParReport(void)
1286 {
1287   ASSERT(run_queue_hd != END_TSO_QUEUE);
1288
1289   /* Take a thread from the run queue, if we have work */
1290   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1291
1292     /* If this TSO has got its outport closed in the meantime, 
1293      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1294      * It has to be marked as TH_DEAD for this purpose.
1295      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1296
1297 JB: TODO: investigate wether state change field could be nuked
1298      entirely and replaced by the normal tso state (whatnext
1299      field). All we want to do is to kill tsos from outside.
1300      */
1301
1302     /* ToDo: write something to the log-file
1303     if (RTSflags.ParFlags.granSimStats && !sameThread)
1304         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1305
1306     CurrentTSO = t;
1307     */
1308     /* the spark pool for the current PE */
1309     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1310
1311     IF_DEBUG(scheduler, 
1312              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1313                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1314
1315     IF_PAR_DEBUG(fish,
1316              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1317                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1318
1319     if (RtsFlags.ParFlags.ParStats.Full && 
1320         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1321         (emitSchedule || // forced emit
1322          (t && LastTSO && t->id != LastTSO->id))) {
1323       /* 
1324          we are running a different TSO, so write a schedule event to log file
1325          NB: If we use fair scheduling we also have to write  a deschedule 
1326              event for LastTSO; with unfair scheduling we know that the
1327              previous tso has blocked whenever we switch to another tso, so
1328              we don't need it in GUM for now
1329       */
1330       IF_PAR_DEBUG(fish, // schedule,
1331                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1332
1333       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1334                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1335       emitSchedule = rtsFalse;
1336     }
1337 }     
1338 #endif
1339
1340 /* ----------------------------------------------------------------------------
1341  * After running a thread...
1342  * ASSUMES: sched_mutex
1343  * ------------------------------------------------------------------------- */
1344
1345 static void
1346 schedulePostRunThread(void)
1347 {
1348 #if defined(PAR)
1349     /* HACK 675: if the last thread didn't yield, make sure to print a 
1350        SCHEDULE event to the log file when StgRunning the next thread, even
1351        if it is the same one as before */
1352     LastTSO = t; 
1353     TimeOfLastYield = CURRENT_TIME;
1354 #endif
1355
1356   /* some statistics gathering in the parallel case */
1357
1358 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1359   switch (ret) {
1360     case HeapOverflow:
1361 # if defined(GRAN)
1362       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1363       globalGranStats.tot_heapover++;
1364 # elif defined(PAR)
1365       globalParStats.tot_heapover++;
1366 # endif
1367       break;
1368
1369      case StackOverflow:
1370 # if defined(GRAN)
1371       IF_DEBUG(gran, 
1372                DumpGranEvent(GR_DESCHEDULE, t));
1373       globalGranStats.tot_stackover++;
1374 # elif defined(PAR)
1375       // IF_DEBUG(par, 
1376       // DumpGranEvent(GR_DESCHEDULE, t);
1377       globalParStats.tot_stackover++;
1378 # endif
1379       break;
1380
1381     case ThreadYielding:
1382 # if defined(GRAN)
1383       IF_DEBUG(gran, 
1384                DumpGranEvent(GR_DESCHEDULE, t));
1385       globalGranStats.tot_yields++;
1386 # elif defined(PAR)
1387       // IF_DEBUG(par, 
1388       // DumpGranEvent(GR_DESCHEDULE, t);
1389       globalParStats.tot_yields++;
1390 # endif
1391       break; 
1392
1393     case ThreadBlocked:
1394 # if defined(GRAN)
1395       IF_DEBUG(scheduler,
1396                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1397                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1398                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1399                if (t->block_info.closure!=(StgClosure*)NULL)
1400                  print_bq(t->block_info.closure);
1401                debugBelch("\n"));
1402
1403       // ??? needed; should emit block before
1404       IF_DEBUG(gran, 
1405                DumpGranEvent(GR_DESCHEDULE, t)); 
1406       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1407       /*
1408         ngoq Dogh!
1409       ASSERT(procStatus[CurrentProc]==Busy || 
1410               ((procStatus[CurrentProc]==Fetching) && 
1411               (t->block_info.closure!=(StgClosure*)NULL)));
1412       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1413           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1414             procStatus[CurrentProc]==Fetching)) 
1415         procStatus[CurrentProc] = Idle;
1416       */
1417 # elif defined(PAR)
1418 //++PAR++  blockThread() writes the event (change?)
1419 # endif
1420     break;
1421
1422   case ThreadFinished:
1423     break;
1424
1425   default:
1426     barf("parGlobalStats: unknown return code");
1427     break;
1428     }
1429 #endif
1430 }
1431
1432 /* -----------------------------------------------------------------------------
1433  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1434  * ASSUMES: sched_mutex
1435  * -------------------------------------------------------------------------- */
1436
1437 static rtsBool
1438 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1439 {
1440     // did the task ask for a large block?
1441     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1442         // if so, get one and push it on the front of the nursery.
1443         bdescr *bd;
1444         lnat blocks;
1445         
1446         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1447         
1448         IF_DEBUG(scheduler,
1449                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", 
1450                             (long)t->id, whatNext_strs[t->what_next], blocks));
1451         
1452         // don't do this if it would push us over the
1453         // alloc_blocks_lim limit; we'll GC first.
1454         if (alloc_blocks + blocks < alloc_blocks_lim) {
1455             
1456             alloc_blocks += blocks;
1457             bd = allocGroup( blocks );
1458             
1459             // link the new group into the list
1460             bd->link = cap->r.rCurrentNursery;
1461             bd->u.back = cap->r.rCurrentNursery->u.back;
1462             if (cap->r.rCurrentNursery->u.back != NULL) {
1463                 cap->r.rCurrentNursery->u.back->link = bd;
1464             } else {
1465                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1466                        g0s0->blocks == cap->r.rNursery);
1467                 cap->r.rNursery = g0s0->blocks = bd;
1468             }             
1469             cap->r.rCurrentNursery->u.back = bd;
1470             
1471             // initialise it as a nursery block.  We initialise the
1472             // step, gen_no, and flags field of *every* sub-block in
1473             // this large block, because this is easier than making
1474             // sure that we always find the block head of a large
1475             // block whenever we call Bdescr() (eg. evacuate() and
1476             // isAlive() in the GC would both have to do this, at
1477             // least).
1478             { 
1479                 bdescr *x;
1480                 for (x = bd; x < bd + blocks; x++) {
1481                     x->step = g0s0;
1482                     x->gen_no = 0;
1483                     x->flags = 0;
1484                 }
1485             }
1486             
1487             // don't forget to update the block count in g0s0.
1488             g0s0->n_blocks += blocks;
1489             // This assert can be a killer if the app is doing lots
1490             // of large block allocations.
1491             ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1492             
1493             // now update the nursery to point to the new block
1494             cap->r.rCurrentNursery = bd;
1495             
1496             // we might be unlucky and have another thread get on the
1497             // run queue before us and steal the large block, but in that
1498             // case the thread will just end up requesting another large
1499             // block.
1500             PUSH_ON_RUN_QUEUE(t);
1501             return rtsFalse;  /* not actually GC'ing */
1502         }
1503     }
1504     
1505     /* make all the running tasks block on a condition variable,
1506      * maybe set context_switch and wait till they all pile in,
1507      * then have them wait on a GC condition variable.
1508      */
1509     IF_DEBUG(scheduler,
1510              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1511                         (long)t->id, whatNext_strs[t->what_next]));
1512     threadPaused(t);
1513 #if defined(GRAN)
1514     ASSERT(!is_on_queue(t,CurrentProc));
1515 #elif defined(PARALLEL_HASKELL)
1516     /* Currently we emit a DESCHEDULE event before GC in GUM.
1517        ToDo: either add separate event to distinguish SYSTEM time from rest
1518        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1519     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1520         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1521                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1522         emitSchedule = rtsTrue;
1523     }
1524 #endif
1525       
1526     PUSH_ON_RUN_QUEUE(t);
1527     return rtsTrue;
1528     /* actual GC is done at the end of the while loop in schedule() */
1529 }
1530
1531 /* -----------------------------------------------------------------------------
1532  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1533  * ASSUMES: sched_mutex
1534  * -------------------------------------------------------------------------- */
1535
1536 static void
1537 scheduleHandleStackOverflow( StgTSO *t)
1538 {
1539     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1540                                   (long)t->id, whatNext_strs[t->what_next]));
1541     /* just adjust the stack for this thread, then pop it back
1542      * on the run queue.
1543      */
1544     threadPaused(t);
1545     { 
1546         /* enlarge the stack */
1547         StgTSO *new_t = threadStackOverflow(t);
1548         
1549         /* This TSO has moved, so update any pointers to it from the
1550          * main thread stack.  It better not be on any other queues...
1551          * (it shouldn't be).
1552          */
1553         if (t->main != NULL) {
1554             t->main->tso = new_t;
1555         }
1556         PUSH_ON_RUN_QUEUE(new_t);
1557     }
1558 }
1559
1560 /* -----------------------------------------------------------------------------
1561  * Handle a thread that returned to the scheduler with ThreadYielding
1562  * ASSUMES: sched_mutex
1563  * -------------------------------------------------------------------------- */
1564
1565 static rtsBool
1566 scheduleHandleYield( StgTSO *t, nat prev_what_next )
1567 {
1568     // Reset the context switch flag.  We don't do this just before
1569     // running the thread, because that would mean we would lose ticks
1570     // during GC, which can lead to unfair scheduling (a thread hogs
1571     // the CPU because the tick always arrives during GC).  This way
1572     // penalises threads that do a lot of allocation, but that seems
1573     // better than the alternative.
1574     context_switch = 0;
1575     
1576     /* put the thread back on the run queue.  Then, if we're ready to
1577      * GC, check whether this is the last task to stop.  If so, wake
1578      * up the GC thread.  getThread will block during a GC until the
1579      * GC is finished.
1580      */
1581     IF_DEBUG(scheduler,
1582              if (t->what_next != prev_what_next) {
1583                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1584                             (long)t->id, whatNext_strs[t->what_next]);
1585              } else {
1586                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1587                             (long)t->id, whatNext_strs[t->what_next]);
1588              }
1589         );
1590     
1591     IF_DEBUG(sanity,
1592              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1593              checkTSO(t));
1594     ASSERT(t->link == END_TSO_QUEUE);
1595     
1596     // Shortcut if we're just switching evaluators: don't bother
1597     // doing stack squeezing (which can be expensive), just run the
1598     // thread.
1599     if (t->what_next != prev_what_next) {
1600         return rtsTrue;
1601     }
1602     
1603     threadPaused(t);
1604     
1605 #if defined(GRAN)
1606     ASSERT(!is_on_queue(t,CurrentProc));
1607       
1608     IF_DEBUG(sanity,
1609              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1610              checkThreadQsSanity(rtsTrue));
1611
1612 #endif
1613
1614     addToRunQueue(t);
1615
1616 #if defined(GRAN)
1617     /* add a ContinueThread event to actually process the thread */
1618     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1619               ContinueThread,
1620               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1621     IF_GRAN_DEBUG(bq, 
1622                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1623                   G_EVENTQ(0);
1624                   G_CURR_THREADQ(0));
1625 #endif
1626     return rtsFalse;
1627 }
1628
1629 /* -----------------------------------------------------------------------------
1630  * Handle a thread that returned to the scheduler with ThreadBlocked
1631  * ASSUMES: sched_mutex
1632  * -------------------------------------------------------------------------- */
1633
1634 static void
1635 scheduleHandleThreadBlocked( StgTSO *t
1636 #if !defined(GRAN) && !defined(DEBUG)
1637     STG_UNUSED
1638 #endif
1639     )
1640 {
1641 #if defined(GRAN)
1642     IF_DEBUG(scheduler,
1643              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1644                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1645              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1646     
1647     // ??? needed; should emit block before
1648     IF_DEBUG(gran, 
1649              DumpGranEvent(GR_DESCHEDULE, t)); 
1650     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1651     /*
1652       ngoq Dogh!
1653       ASSERT(procStatus[CurrentProc]==Busy || 
1654       ((procStatus[CurrentProc]==Fetching) && 
1655       (t->block_info.closure!=(StgClosure*)NULL)));
1656       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1657       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1658       procStatus[CurrentProc]==Fetching)) 
1659       procStatus[CurrentProc] = Idle;
1660     */
1661 #elif defined(PAR)
1662     IF_DEBUG(scheduler,
1663              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1664                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1665     IF_PAR_DEBUG(bq,
1666                  
1667                  if (t->block_info.closure!=(StgClosure*)NULL) 
1668                  print_bq(t->block_info.closure));
1669     
1670     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1671     blockThread(t);
1672     
1673     /* whatever we schedule next, we must log that schedule */
1674     emitSchedule = rtsTrue;
1675     
1676 #else /* !GRAN */
1677       /* don't need to do anything.  Either the thread is blocked on
1678        * I/O, in which case we'll have called addToBlockedQueue
1679        * previously, or it's blocked on an MVar or Blackhole, in which
1680        * case it'll be on the relevant queue already.
1681        */
1682     ASSERT(t->why_blocked != NotBlocked);
1683     IF_DEBUG(scheduler,
1684              debugBelch("--<< thread %d (%s) stopped: ", 
1685                         t->id, whatNext_strs[t->what_next]);
1686              printThreadBlockage(t);
1687              debugBelch("\n"));
1688     
1689     /* Only for dumping event to log file 
1690        ToDo: do I need this in GranSim, too?
1691        blockThread(t);
1692     */
1693 #endif
1694 }
1695
1696 /* -----------------------------------------------------------------------------
1697  * Handle a thread that returned to the scheduler with ThreadFinished
1698  * ASSUMES: sched_mutex
1699  * -------------------------------------------------------------------------- */
1700
1701 static rtsBool
1702 scheduleHandleThreadFinished( StgMainThread *mainThread
1703                               USED_WHEN_RTS_SUPPORTS_THREADS,
1704                               Capability *cap,
1705                               StgTSO *t )
1706 {
1707     /* Need to check whether this was a main thread, and if so,
1708      * return with the return value.
1709      *
1710      * We also end up here if the thread kills itself with an
1711      * uncaught exception, see Exception.cmm.
1712      */
1713     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1714                                   t->id, whatNext_strs[t->what_next]));
1715
1716 #if defined(GRAN)
1717       endThread(t, CurrentProc); // clean-up the thread
1718 #elif defined(PARALLEL_HASKELL)
1719       /* For now all are advisory -- HWL */
1720       //if(t->priority==AdvisoryPriority) ??
1721       advisory_thread_count--; // JB: Caution with this counter, buggy!
1722       
1723 # if defined(DIST)
1724       if(t->dist.priority==RevalPriority)
1725         FinishReval(t);
1726 # endif
1727     
1728 # if defined(EDENOLD)
1729       // the thread could still have an outport... (BUG)
1730       if (t->eden.outport != -1) {
1731       // delete the outport for the tso which has finished...
1732         IF_PAR_DEBUG(eden_ports,
1733                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1734                               t->eden.outport, t->id));
1735         deleteOPT(t);
1736       }
1737       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1738       if (t->eden.epid != -1) {
1739         IF_PAR_DEBUG(eden_ports,
1740                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1741                            t->id, t->eden.epid));
1742         removeTSOfromProcess(t);
1743       }
1744 # endif 
1745
1746 # if defined(PAR)
1747       if (RtsFlags.ParFlags.ParStats.Full &&
1748           !RtsFlags.ParFlags.ParStats.Suppressed) 
1749         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1750
1751       //  t->par only contains statistics: left out for now...
1752       IF_PAR_DEBUG(fish,
1753                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1754                               t->id,t,t->par.sparkname));
1755 # endif
1756 #endif // PARALLEL_HASKELL
1757
1758       //
1759       // Check whether the thread that just completed was a main
1760       // thread, and if so return with the result.  
1761       //
1762       // There is an assumption here that all thread completion goes
1763       // through this point; we need to make sure that if a thread
1764       // ends up in the ThreadKilled state, that it stays on the run
1765       // queue so it can be dealt with here.
1766       //
1767       if (
1768 #if defined(RTS_SUPPORTS_THREADS)
1769           mainThread != NULL
1770 #else
1771           mainThread->tso == t
1772 #endif
1773           )
1774       {
1775           // We are a bound thread: this must be our thread that just
1776           // completed.
1777           ASSERT(mainThread->tso == t);
1778
1779           if (t->what_next == ThreadComplete) {
1780               if (mainThread->ret) {
1781                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1782                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1783               }
1784               mainThread->stat = Success;
1785           } else {
1786               if (mainThread->ret) {
1787                   *(mainThread->ret) = NULL;
1788               }
1789               if (interrupted) {
1790                   mainThread->stat = Interrupted;
1791               } else {
1792                   mainThread->stat = Killed;
1793               }
1794           }
1795 #ifdef DEBUG
1796           removeThreadLabel((StgWord)mainThread->tso->id);
1797 #endif
1798           if (mainThread->prev == NULL) {
1799               main_threads = mainThread->link;
1800           } else {
1801               mainThread->prev->link = mainThread->link;
1802           }
1803           if (mainThread->link != NULL) {
1804               mainThread->link->prev = NULL;
1805           }
1806           releaseCapability(cap);
1807           return rtsTrue; // tells schedule() to return
1808       }
1809
1810 #ifdef RTS_SUPPORTS_THREADS
1811       ASSERT(t->main == NULL);
1812 #else
1813       if (t->main != NULL) {
1814           // Must be a main thread that is not the topmost one.  Leave
1815           // it on the run queue until the stack has unwound to the
1816           // point where we can deal with this.  Leaving it on the run
1817           // queue also ensures that the garbage collector knows about
1818           // this thread and its return value (it gets dropped from the
1819           // all_threads list so there's no other way to find it).
1820           APPEND_TO_RUN_QUEUE(t);
1821       }
1822 #endif
1823       return rtsFalse;
1824 }
1825
1826 /* -----------------------------------------------------------------------------
1827  * Perform a heap census, if PROFILING
1828  * -------------------------------------------------------------------------- */
1829
1830 static void
1831 scheduleDoHeapProfile(void)
1832 {
1833 #ifdef PROFILING
1834     // When we have +RTS -i0 and we're heap profiling, do a census at
1835     // every GC.  This lets us get repeatable runs for debugging.
1836     if (performHeapProfile ||
1837         (RtsFlags.ProfFlags.profileInterval==0 &&
1838          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1839         GarbageCollect(GetRoots, rtsTrue);
1840         heapCensus();
1841         performHeapProfile = rtsFalse;
1842         ready_to_gc = rtsFalse; // we already GC'd
1843     }
1844 #endif
1845 }
1846
1847 /* -----------------------------------------------------------------------------
1848  * Perform a garbage collection if necessary
1849  * ASSUMES: sched_mutex
1850  * -------------------------------------------------------------------------- */
1851
1852 static void
1853 scheduleDoGC(void)
1854 {
1855     StgTSO *t;
1856
1857 #ifdef SMP
1858     // The last task to stop actually gets to do the GC.  The rest
1859     // of the tasks release their capabilities and wait gc_pending_cond.
1860     if (ready_to_gc && allFreeCapabilities())
1861 #else
1862     if (ready_to_gc)
1863 #endif
1864     {
1865         /* Kick any transactions which are invalid back to their
1866          * atomically frames.  When next scheduled they will try to
1867          * commit, this commit will fail and they will retry.
1868          */
1869         for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1870             if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1871                 if (!stmValidateTransaction (t -> trec)) {
1872                     IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1873                     
1874                     // strip the stack back to the ATOMICALLY_FRAME, aborting
1875                     // the (nested) transaction, and saving the stack of any
1876                     // partially-evaluated thunks on the heap.
1877                     raiseAsync_(t, NULL, rtsTrue);
1878                     
1879 #ifdef REG_R1
1880                     ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1881 #endif
1882                 }
1883             }
1884         }
1885
1886         /* everybody back, start the GC.
1887          * Could do it in this thread, or signal a condition var
1888          * to do it in another thread.  Either way, we need to
1889          * broadcast on gc_pending_cond afterward.
1890          */
1891 #if defined(RTS_SUPPORTS_THREADS)
1892         IF_DEBUG(scheduler,sched_belch("doing GC"));
1893 #endif
1894         GarbageCollect(GetRoots,rtsFalse);
1895         ready_to_gc = rtsFalse;
1896 #if defined(SMP)
1897         broadcastCondition(&gc_pending_cond);
1898 #endif
1899 #if defined(GRAN)
1900         /* add a ContinueThread event to continue execution of current thread */
1901         new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1902                   ContinueThread,
1903                   t, (StgClosure*)NULL, (rtsSpark*)NULL);
1904         IF_GRAN_DEBUG(bq, 
1905                       debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1906                       G_EVENTQ(0);
1907                       G_CURR_THREADQ(0));
1908 #endif /* GRAN */
1909     }
1910 }
1911
1912 /* ---------------------------------------------------------------------------
1913  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1914  * used by Control.Concurrent for error checking.
1915  * ------------------------------------------------------------------------- */
1916  
1917 StgBool
1918 rtsSupportsBoundThreads(void)
1919 {
1920 #ifdef THREADED_RTS
1921   return rtsTrue;
1922 #else
1923   return rtsFalse;
1924 #endif
1925 }
1926
1927 /* ---------------------------------------------------------------------------
1928  * isThreadBound(tso): check whether tso is bound to an OS thread.
1929  * ------------------------------------------------------------------------- */
1930  
1931 StgBool
1932 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1933 {
1934 #ifdef THREADED_RTS
1935   return (tso->main != NULL);
1936 #endif
1937   return rtsFalse;
1938 }
1939
1940 /* ---------------------------------------------------------------------------
1941  * Singleton fork(). Do not copy any running threads.
1942  * ------------------------------------------------------------------------- */
1943
1944 #ifndef mingw32_HOST_OS
1945 #define FORKPROCESS_PRIMOP_SUPPORTED
1946 #endif
1947
1948 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1949 static void 
1950 deleteThreadImmediately(StgTSO *tso);
1951 #endif
1952 StgInt
1953 forkProcess(HsStablePtr *entry
1954 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1955             STG_UNUSED
1956 #endif
1957            )
1958 {
1959 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1960   pid_t pid;
1961   StgTSO* t,*next;
1962   StgMainThread *m;
1963   SchedulerStatus rc;
1964
1965   IF_DEBUG(scheduler,sched_belch("forking!"));
1966   rts_lock(); // This not only acquires sched_mutex, it also
1967               // makes sure that no other threads are running
1968
1969   pid = fork();
1970
1971   if (pid) { /* parent */
1972
1973   /* just return the pid */
1974     rts_unlock();
1975     return pid;
1976     
1977   } else { /* child */
1978     
1979     
1980       // delete all threads
1981     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1982     
1983     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1984       next = t->link;
1985
1986         // don't allow threads to catch the ThreadKilled exception
1987       deleteThreadImmediately(t);
1988     }
1989     
1990       // wipe the main thread list
1991     while((m = main_threads) != NULL) {
1992       main_threads = m->link;
1993 # ifdef THREADED_RTS
1994       closeCondition(&m->bound_thread_cond);
1995 # endif
1996       stgFree(m);
1997     }
1998     
1999     rc = rts_evalStableIO(entry, NULL);  // run the action
2000     rts_checkSchedStatus("forkProcess",rc);
2001     
2002     rts_unlock();
2003     
2004     hs_exit();                      // clean up and exit
2005     stg_exit(0);
2006   }
2007 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2008   barf("forkProcess#: primop not supported, sorry!\n");
2009   return -1;
2010 #endif
2011 }
2012
2013 /* ---------------------------------------------------------------------------
2014  * deleteAllThreads():  kill all the live threads.
2015  *
2016  * This is used when we catch a user interrupt (^C), before performing
2017  * any necessary cleanups and running finalizers.
2018  *
2019  * Locks: sched_mutex held.
2020  * ------------------------------------------------------------------------- */
2021    
2022 void
2023 deleteAllThreads ( void )
2024 {
2025   StgTSO* t, *next;
2026   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2027   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2028       next = t->global_link;
2029       deleteThread(t);
2030   }      
2031
2032   // The run queue now contains a bunch of ThreadKilled threads.  We
2033   // must not throw these away: the main thread(s) will be in there
2034   // somewhere, and the main scheduler loop has to deal with it.
2035   // Also, the run queue is the only thing keeping these threads from
2036   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2037
2038   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2039   ASSERT(sleeping_queue == END_TSO_QUEUE);
2040 }
2041
2042 /* startThread and  insertThread are now in GranSim.c -- HWL */
2043
2044
2045 /* ---------------------------------------------------------------------------
2046  * Suspending & resuming Haskell threads.
2047  * 
2048  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2049  * its capability before calling the C function.  This allows another
2050  * task to pick up the capability and carry on running Haskell
2051  * threads.  It also means that if the C call blocks, it won't lock
2052  * the whole system.
2053  *
2054  * The Haskell thread making the C call is put to sleep for the
2055  * duration of the call, on the susepended_ccalling_threads queue.  We
2056  * give out a token to the task, which it can use to resume the thread
2057  * on return from the C function.
2058  * ------------------------------------------------------------------------- */
2059    
2060 StgInt
2061 suspendThread( StgRegTable *reg )
2062 {
2063   nat tok;
2064   Capability *cap;
2065   int saved_errno = errno;
2066
2067   /* assume that *reg is a pointer to the StgRegTable part
2068    * of a Capability.
2069    */
2070   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
2071
2072   ACQUIRE_LOCK(&sched_mutex);
2073
2074   IF_DEBUG(scheduler,
2075            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
2076
2077   // XXX this might not be necessary --SDM
2078   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
2079
2080   threadPaused(cap->r.rCurrentTSO);
2081   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
2082   suspended_ccalling_threads = cap->r.rCurrentTSO;
2083
2084   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
2085       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
2086       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
2087   } else {
2088       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
2089   }
2090
2091   /* Use the thread ID as the token; it should be unique */
2092   tok = cap->r.rCurrentTSO->id;
2093
2094   /* Hand back capability */
2095   releaseCapability(cap);
2096   
2097 #if defined(RTS_SUPPORTS_THREADS)
2098   /* Preparing to leave the RTS, so ensure there's a native thread/task
2099      waiting to take over.
2100   */
2101   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
2102 #endif
2103
2104   in_haskell = rtsFalse;
2105   RELEASE_LOCK(&sched_mutex);
2106   
2107   errno = saved_errno;
2108   return tok; 
2109 }
2110
2111 StgRegTable *
2112 resumeThread( StgInt tok )
2113 {
2114   StgTSO *tso, **prev;
2115   Capability *cap;
2116   int saved_errno = errno;
2117
2118 #if defined(RTS_SUPPORTS_THREADS)
2119   /* Wait for permission to re-enter the RTS with the result. */
2120   ACQUIRE_LOCK(&sched_mutex);
2121   waitForReturnCapability(&sched_mutex, &cap);
2122
2123   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
2124 #else
2125   grabCapability(&cap);
2126 #endif
2127
2128   /* Remove the thread off of the suspended list */
2129   prev = &suspended_ccalling_threads;
2130   for (tso = suspended_ccalling_threads; 
2131        tso != END_TSO_QUEUE; 
2132        prev = &tso->link, tso = tso->link) {
2133     if (tso->id == (StgThreadID)tok) {
2134       *prev = tso->link;
2135       break;
2136     }
2137   }
2138   if (tso == END_TSO_QUEUE) {
2139     barf("resumeThread: thread not found");
2140   }
2141   tso->link = END_TSO_QUEUE;
2142   
2143   if(tso->why_blocked == BlockedOnCCall) {
2144       awakenBlockedQueueNoLock(tso->blocked_exceptions);
2145       tso->blocked_exceptions = NULL;
2146   }
2147   
2148   /* Reset blocking status */
2149   tso->why_blocked  = NotBlocked;
2150
2151   cap->r.rCurrentTSO = tso;
2152   in_haskell = rtsTrue;
2153   RELEASE_LOCK(&sched_mutex);
2154   errno = saved_errno;
2155   return &cap->r;
2156 }
2157
2158 /* ---------------------------------------------------------------------------
2159  * Comparing Thread ids.
2160  *
2161  * This is used from STG land in the implementation of the
2162  * instances of Eq/Ord for ThreadIds.
2163  * ------------------------------------------------------------------------ */
2164
2165 int
2166 cmp_thread(StgPtr tso1, StgPtr tso2) 
2167
2168   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2169   StgThreadID id2 = ((StgTSO *)tso2)->id;
2170  
2171   if (id1 < id2) return (-1);
2172   if (id1 > id2) return 1;
2173   return 0;
2174 }
2175
2176 /* ---------------------------------------------------------------------------
2177  * Fetching the ThreadID from an StgTSO.
2178  *
2179  * This is used in the implementation of Show for ThreadIds.
2180  * ------------------------------------------------------------------------ */
2181 int
2182 rts_getThreadId(StgPtr tso) 
2183 {
2184   return ((StgTSO *)tso)->id;
2185 }
2186
2187 #ifdef DEBUG
2188 void
2189 labelThread(StgPtr tso, char *label)
2190 {
2191   int len;
2192   void *buf;
2193
2194   /* Caveat: Once set, you can only set the thread name to "" */
2195   len = strlen(label)+1;
2196   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2197   strncpy(buf,label,len);
2198   /* Update will free the old memory for us */
2199   updateThreadLabel(((StgTSO *)tso)->id,buf);
2200 }
2201 #endif /* DEBUG */
2202
2203 /* ---------------------------------------------------------------------------
2204    Create a new thread.
2205
2206    The new thread starts with the given stack size.  Before the
2207    scheduler can run, however, this thread needs to have a closure
2208    (and possibly some arguments) pushed on its stack.  See
2209    pushClosure() in Schedule.h.
2210
2211    createGenThread() and createIOThread() (in SchedAPI.h) are
2212    convenient packaged versions of this function.
2213
2214    currently pri (priority) is only used in a GRAN setup -- HWL
2215    ------------------------------------------------------------------------ */
2216 #if defined(GRAN)
2217 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2218 StgTSO *
2219 createThread(nat size, StgInt pri)
2220 #else
2221 StgTSO *
2222 createThread(nat size)
2223 #endif
2224 {
2225
2226     StgTSO *tso;
2227     nat stack_size;
2228
2229     /* First check whether we should create a thread at all */
2230 #if defined(PARALLEL_HASKELL)
2231   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2232   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2233     threadsIgnored++;
2234     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2235           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2236     return END_TSO_QUEUE;
2237   }
2238   threadsCreated++;
2239 #endif
2240
2241 #if defined(GRAN)
2242   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2243 #endif
2244
2245   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2246
2247   /* catch ridiculously small stack sizes */
2248   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2249     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2250   }
2251
2252   stack_size = size - TSO_STRUCT_SIZEW;
2253
2254   tso = (StgTSO *)allocate(size);
2255   TICK_ALLOC_TSO(stack_size, 0);
2256
2257   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2258 #if defined(GRAN)
2259   SET_GRAN_HDR(tso, ThisPE);
2260 #endif
2261
2262   // Always start with the compiled code evaluator
2263   tso->what_next = ThreadRunGHC;
2264
2265   tso->id = next_thread_id++; 
2266   tso->why_blocked  = NotBlocked;
2267   tso->blocked_exceptions = NULL;
2268
2269   tso->saved_errno = 0;
2270   tso->main = NULL;
2271   
2272   tso->stack_size   = stack_size;
2273   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2274                               - TSO_STRUCT_SIZEW;
2275   tso->sp           = (P_)&(tso->stack) + stack_size;
2276
2277   tso->trec = NO_TREC;
2278
2279 #ifdef PROFILING
2280   tso->prof.CCCS = CCS_MAIN;
2281 #endif
2282
2283   /* put a stop frame on the stack */
2284   tso->sp -= sizeofW(StgStopFrame);
2285   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2286   tso->link = END_TSO_QUEUE;
2287
2288   // ToDo: check this
2289 #if defined(GRAN)
2290   /* uses more flexible routine in GranSim */
2291   insertThread(tso, CurrentProc);
2292 #else
2293   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2294    * from its creation
2295    */
2296 #endif
2297
2298 #if defined(GRAN) 
2299   if (RtsFlags.GranFlags.GranSimStats.Full) 
2300     DumpGranEvent(GR_START,tso);
2301 #elif defined(PARALLEL_HASKELL)
2302   if (RtsFlags.ParFlags.ParStats.Full) 
2303     DumpGranEvent(GR_STARTQ,tso);
2304   /* HACk to avoid SCHEDULE 
2305      LastTSO = tso; */
2306 #endif
2307
2308   /* Link the new thread on the global thread list.
2309    */
2310   tso->global_link = all_threads;
2311   all_threads = tso;
2312
2313 #if defined(DIST)
2314   tso->dist.priority = MandatoryPriority; //by default that is...
2315 #endif
2316
2317 #if defined(GRAN)
2318   tso->gran.pri = pri;
2319 # if defined(DEBUG)
2320   tso->gran.magic = TSO_MAGIC; // debugging only
2321 # endif
2322   tso->gran.sparkname   = 0;
2323   tso->gran.startedat   = CURRENT_TIME; 
2324   tso->gran.exported    = 0;
2325   tso->gran.basicblocks = 0;
2326   tso->gran.allocs      = 0;
2327   tso->gran.exectime    = 0;
2328   tso->gran.fetchtime   = 0;
2329   tso->gran.fetchcount  = 0;
2330   tso->gran.blocktime   = 0;
2331   tso->gran.blockcount  = 0;
2332   tso->gran.blockedat   = 0;
2333   tso->gran.globalsparks = 0;
2334   tso->gran.localsparks  = 0;
2335   if (RtsFlags.GranFlags.Light)
2336     tso->gran.clock  = Now; /* local clock */
2337   else
2338     tso->gran.clock  = 0;
2339
2340   IF_DEBUG(gran,printTSO(tso));
2341 #elif defined(PARALLEL_HASKELL)
2342 # if defined(DEBUG)
2343   tso->par.magic = TSO_MAGIC; // debugging only
2344 # endif
2345   tso->par.sparkname   = 0;
2346   tso->par.startedat   = CURRENT_TIME; 
2347   tso->par.exported    = 0;
2348   tso->par.basicblocks = 0;
2349   tso->par.allocs      = 0;
2350   tso->par.exectime    = 0;
2351   tso->par.fetchtime   = 0;
2352   tso->par.fetchcount  = 0;
2353   tso->par.blocktime   = 0;
2354   tso->par.blockcount  = 0;
2355   tso->par.blockedat   = 0;
2356   tso->par.globalsparks = 0;
2357   tso->par.localsparks  = 0;
2358 #endif
2359
2360 #if defined(GRAN)
2361   globalGranStats.tot_threads_created++;
2362   globalGranStats.threads_created_on_PE[CurrentProc]++;
2363   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2364   globalGranStats.tot_sq_probes++;
2365 #elif defined(PARALLEL_HASKELL)
2366   // collect parallel global statistics (currently done together with GC stats)
2367   if (RtsFlags.ParFlags.ParStats.Global &&
2368       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2369     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2370     globalParStats.tot_threads_created++;
2371   }
2372 #endif 
2373
2374 #if defined(GRAN)
2375   IF_GRAN_DEBUG(pri,
2376                 sched_belch("==__ schedule: Created TSO %d (%p);",
2377                       CurrentProc, tso, tso->id));
2378 #elif defined(PARALLEL_HASKELL)
2379   IF_PAR_DEBUG(verbose,
2380                sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2381                            (long)tso->id, tso, advisory_thread_count));
2382 #else
2383   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2384                                  (long)tso->id, (long)tso->stack_size));
2385 #endif    
2386   return tso;
2387 }
2388
2389 #if defined(PAR)
2390 /* RFP:
2391    all parallel thread creation calls should fall through the following routine.
2392 */
2393 StgTSO *
2394 createThreadFromSpark(rtsSpark spark) 
2395 { StgTSO *tso;
2396   ASSERT(spark != (rtsSpark)NULL);
2397 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2398   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2399   { threadsIgnored++;
2400     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2401           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2402     return END_TSO_QUEUE;
2403   }
2404   else
2405   { threadsCreated++;
2406     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2407     if (tso==END_TSO_QUEUE)     
2408       barf("createSparkThread: Cannot create TSO");
2409 #if defined(DIST)
2410     tso->priority = AdvisoryPriority;
2411 #endif
2412     pushClosure(tso,spark);
2413     addToRunQueue(tso);
2414     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2415   }
2416   return tso;
2417 }
2418 #endif
2419
2420 /*
2421   Turn a spark into a thread.
2422   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2423 */
2424 #if 0
2425 StgTSO *
2426 activateSpark (rtsSpark spark) 
2427 {
2428   StgTSO *tso;
2429
2430   tso = createSparkThread(spark);
2431   if (RtsFlags.ParFlags.ParStats.Full) {   
2432     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2433       IF_PAR_DEBUG(verbose,
2434                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2435                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2436   }
2437   // ToDo: fwd info on local/global spark to thread -- HWL
2438   // tso->gran.exported =  spark->exported;
2439   // tso->gran.locked =   !spark->global;
2440   // tso->gran.sparkname = spark->name;
2441
2442   return tso;
2443 }
2444 #endif
2445
2446 /* ---------------------------------------------------------------------------
2447  * scheduleThread()
2448  *
2449  * scheduleThread puts a thread on the head of the runnable queue.
2450  * This will usually be done immediately after a thread is created.
2451  * The caller of scheduleThread must create the thread using e.g.
2452  * createThread and push an appropriate closure
2453  * on this thread's stack before the scheduler is invoked.
2454  * ------------------------------------------------------------------------ */
2455
2456 static void
2457 scheduleThread_(StgTSO *tso)
2458 {
2459   // The thread goes at the *end* of the run-queue, to avoid possible
2460   // starvation of any threads already on the queue.
2461   APPEND_TO_RUN_QUEUE(tso);
2462   threadRunnable();
2463 }
2464
2465 void
2466 scheduleThread(StgTSO* tso)
2467 {
2468   ACQUIRE_LOCK(&sched_mutex);
2469   scheduleThread_(tso);
2470   RELEASE_LOCK(&sched_mutex);
2471 }
2472
2473 #if defined(RTS_SUPPORTS_THREADS)
2474 static Condition bound_cond_cache;
2475 static int bound_cond_cache_full = 0;
2476 #endif
2477
2478
2479 SchedulerStatus
2480 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
2481                    Capability *initialCapability)
2482 {
2483     // Precondition: sched_mutex must be held
2484     StgMainThread *m;
2485
2486     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2487     m->tso = tso;
2488     tso->main = m;
2489     m->ret = ret;
2490     m->stat = NoStatus;
2491     m->link = main_threads;
2492     m->prev = NULL;
2493     if (main_threads != NULL) {
2494         main_threads->prev = m;
2495     }
2496     main_threads = m;
2497
2498 #if defined(RTS_SUPPORTS_THREADS)
2499     // Allocating a new condition for each thread is expensive, so we
2500     // cache one.  This is a pretty feeble hack, but it helps speed up
2501     // consecutive call-ins quite a bit.
2502     if (bound_cond_cache_full) {
2503         m->bound_thread_cond = bound_cond_cache;
2504         bound_cond_cache_full = 0;
2505     } else {
2506         initCondition(&m->bound_thread_cond);
2507     }
2508 #endif
2509
2510     /* Put the thread on the main-threads list prior to scheduling the TSO.
2511        Failure to do so introduces a race condition in the MT case (as
2512        identified by Wolfgang Thaller), whereby the new task/OS thread 
2513        created by scheduleThread_() would complete prior to the thread
2514        that spawned it managed to put 'itself' on the main-threads list.
2515        The upshot of it all being that the worker thread wouldn't get to
2516        signal the completion of the its work item for the main thread to
2517        see (==> it got stuck waiting.)    -- sof 6/02.
2518     */
2519     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2520     
2521     APPEND_TO_RUN_QUEUE(tso);
2522     // NB. Don't call threadRunnable() here, because the thread is
2523     // bound and only runnable by *this* OS thread, so waking up other
2524     // workers will just slow things down.
2525
2526     return waitThread_(m, initialCapability);
2527 }
2528
2529 /* ---------------------------------------------------------------------------
2530  * initScheduler()
2531  *
2532  * Initialise the scheduler.  This resets all the queues - if the
2533  * queues contained any threads, they'll be garbage collected at the
2534  * next pass.
2535  *
2536  * ------------------------------------------------------------------------ */
2537
2538 void 
2539 initScheduler(void)
2540 {
2541 #if defined(GRAN)
2542   nat i;
2543
2544   for (i=0; i<=MAX_PROC; i++) {
2545     run_queue_hds[i]      = END_TSO_QUEUE;
2546     run_queue_tls[i]      = END_TSO_QUEUE;
2547     blocked_queue_hds[i]  = END_TSO_QUEUE;
2548     blocked_queue_tls[i]  = END_TSO_QUEUE;
2549     ccalling_threadss[i]  = END_TSO_QUEUE;
2550     sleeping_queue        = END_TSO_QUEUE;
2551   }
2552 #else
2553   run_queue_hd      = END_TSO_QUEUE;
2554   run_queue_tl      = END_TSO_QUEUE;
2555   blocked_queue_hd  = END_TSO_QUEUE;
2556   blocked_queue_tl  = END_TSO_QUEUE;
2557   sleeping_queue    = END_TSO_QUEUE;
2558 #endif 
2559
2560   suspended_ccalling_threads  = END_TSO_QUEUE;
2561
2562   main_threads = NULL;
2563   all_threads  = END_TSO_QUEUE;
2564
2565   context_switch = 0;
2566   interrupted    = 0;
2567
2568   RtsFlags.ConcFlags.ctxtSwitchTicks =
2569       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2570       
2571 #if defined(RTS_SUPPORTS_THREADS)
2572   /* Initialise the mutex and condition variables used by
2573    * the scheduler. */
2574   initMutex(&sched_mutex);
2575   initMutex(&term_mutex);
2576 #endif
2577   
2578   ACQUIRE_LOCK(&sched_mutex);
2579
2580   /* A capability holds the state a native thread needs in
2581    * order to execute STG code. At least one capability is
2582    * floating around (only SMP builds have more than one).
2583    */
2584   initCapabilities();
2585   
2586 #if defined(RTS_SUPPORTS_THREADS)
2587     /* start our haskell execution tasks */
2588     startTaskManager(0,taskStart);
2589 #endif
2590
2591 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2592   initSparkPools();
2593 #endif
2594
2595   RELEASE_LOCK(&sched_mutex);
2596 }
2597
2598 void
2599 exitScheduler( void )
2600 {
2601 #if defined(RTS_SUPPORTS_THREADS)
2602   stopTaskManager();
2603 #endif
2604   interrupted = rtsTrue;
2605   shutting_down_scheduler = rtsTrue;
2606 }
2607
2608 /* ----------------------------------------------------------------------------
2609    Managing the per-task allocation areas.
2610    
2611    Each capability comes with an allocation area.  These are
2612    fixed-length block lists into which allocation can be done.
2613
2614    ToDo: no support for two-space collection at the moment???
2615    ------------------------------------------------------------------------- */
2616
2617 static SchedulerStatus
2618 waitThread_(StgMainThread* m, Capability *initialCapability)
2619 {
2620   SchedulerStatus stat;
2621
2622   // Precondition: sched_mutex must be held.
2623   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2624
2625 #if defined(GRAN)
2626   /* GranSim specific init */
2627   CurrentTSO = m->tso;                // the TSO to run
2628   procStatus[MainProc] = Busy;        // status of main PE
2629   CurrentProc = MainProc;             // PE to run it on
2630   schedule(m,initialCapability);
2631 #else
2632   schedule(m,initialCapability);
2633   ASSERT(m->stat != NoStatus);
2634 #endif
2635
2636   stat = m->stat;
2637
2638 #if defined(RTS_SUPPORTS_THREADS)
2639   // Free the condition variable, returning it to the cache if possible.
2640   if (!bound_cond_cache_full) {
2641       bound_cond_cache = m->bound_thread_cond;
2642       bound_cond_cache_full = 1;
2643   } else {
2644       closeCondition(&m->bound_thread_cond);
2645   }
2646 #endif
2647
2648   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2649   stgFree(m);
2650
2651   // Postcondition: sched_mutex still held
2652   return stat;
2653 }
2654
2655 /* ---------------------------------------------------------------------------
2656    Where are the roots that we know about?
2657
2658         - all the threads on the runnable queue
2659         - all the threads on the blocked queue
2660         - all the threads on the sleeping queue
2661         - all the thread currently executing a _ccall_GC
2662         - all the "main threads"
2663      
2664    ------------------------------------------------------------------------ */
2665
2666 /* This has to be protected either by the scheduler monitor, or by the
2667         garbage collection monitor (probably the latter).
2668         KH @ 25/10/99
2669 */
2670
2671 void
2672 GetRoots( evac_fn evac )
2673 {
2674 #if defined(GRAN)
2675   {
2676     nat i;
2677     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2678       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2679           evac((StgClosure **)&run_queue_hds[i]);
2680       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2681           evac((StgClosure **)&run_queue_tls[i]);
2682       
2683       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2684           evac((StgClosure **)&blocked_queue_hds[i]);
2685       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2686           evac((StgClosure **)&blocked_queue_tls[i]);
2687       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2688           evac((StgClosure **)&ccalling_threads[i]);
2689     }
2690   }
2691
2692   markEventQueue();
2693
2694 #else /* !GRAN */
2695   if (run_queue_hd != END_TSO_QUEUE) {
2696       ASSERT(run_queue_tl != END_TSO_QUEUE);
2697       evac((StgClosure **)&run_queue_hd);
2698       evac((StgClosure **)&run_queue_tl);
2699   }
2700   
2701   if (blocked_queue_hd != END_TSO_QUEUE) {
2702       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2703       evac((StgClosure **)&blocked_queue_hd);
2704       evac((StgClosure **)&blocked_queue_tl);
2705   }
2706   
2707   if (sleeping_queue != END_TSO_QUEUE) {
2708       evac((StgClosure **)&sleeping_queue);
2709   }
2710 #endif 
2711
2712   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2713       evac((StgClosure **)&suspended_ccalling_threads);
2714   }
2715
2716 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2717   markSparkQueue(evac);
2718 #endif
2719
2720 #if defined(RTS_USER_SIGNALS)
2721   // mark the signal handlers (signals should be already blocked)
2722   markSignalHandlers(evac);
2723 #endif
2724 }
2725
2726 /* -----------------------------------------------------------------------------
2727    performGC
2728
2729    This is the interface to the garbage collector from Haskell land.
2730    We provide this so that external C code can allocate and garbage
2731    collect when called from Haskell via _ccall_GC.
2732
2733    It might be useful to provide an interface whereby the programmer
2734    can specify more roots (ToDo).
2735    
2736    This needs to be protected by the GC condition variable above.  KH.
2737    -------------------------------------------------------------------------- */
2738
2739 static void (*extra_roots)(evac_fn);
2740
2741 void
2742 performGC(void)
2743 {
2744   /* Obligated to hold this lock upon entry */
2745   ACQUIRE_LOCK(&sched_mutex);
2746   GarbageCollect(GetRoots,rtsFalse);
2747   RELEASE_LOCK(&sched_mutex);
2748 }
2749
2750 void
2751 performMajorGC(void)
2752 {
2753   ACQUIRE_LOCK(&sched_mutex);
2754   GarbageCollect(GetRoots,rtsTrue);
2755   RELEASE_LOCK(&sched_mutex);
2756 }
2757
2758 static void
2759 AllRoots(evac_fn evac)
2760 {
2761     GetRoots(evac);             // the scheduler's roots
2762     extra_roots(evac);          // the user's roots
2763 }
2764
2765 void
2766 performGCWithRoots(void (*get_roots)(evac_fn))
2767 {
2768   ACQUIRE_LOCK(&sched_mutex);
2769   extra_roots = get_roots;
2770   GarbageCollect(AllRoots,rtsFalse);
2771   RELEASE_LOCK(&sched_mutex);
2772 }
2773
2774 /* -----------------------------------------------------------------------------
2775    Stack overflow
2776
2777    If the thread has reached its maximum stack size, then raise the
2778    StackOverflow exception in the offending thread.  Otherwise
2779    relocate the TSO into a larger chunk of memory and adjust its stack
2780    size appropriately.
2781    -------------------------------------------------------------------------- */
2782
2783 static StgTSO *
2784 threadStackOverflow(StgTSO *tso)
2785 {
2786   nat new_stack_size, stack_words;
2787   lnat new_tso_size;
2788   StgPtr new_sp;
2789   StgTSO *dest;
2790
2791   IF_DEBUG(sanity,checkTSO(tso));
2792   if (tso->stack_size >= tso->max_stack_size) {
2793
2794     IF_DEBUG(gc,
2795              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2796                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2797              /* If we're debugging, just print out the top of the stack */
2798              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2799                                               tso->sp+64)));
2800
2801     /* Send this thread the StackOverflow exception */
2802     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2803     return tso;
2804   }
2805
2806   /* Try to double the current stack size.  If that takes us over the
2807    * maximum stack size for this thread, then use the maximum instead.
2808    * Finally round up so the TSO ends up as a whole number of blocks.
2809    */
2810   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2811   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2812                                        TSO_STRUCT_SIZE)/sizeof(W_);
2813   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2814   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2815
2816   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2817
2818   dest = (StgTSO *)allocate(new_tso_size);
2819   TICK_ALLOC_TSO(new_stack_size,0);
2820
2821   /* copy the TSO block and the old stack into the new area */
2822   memcpy(dest,tso,TSO_STRUCT_SIZE);
2823   stack_words = tso->stack + tso->stack_size - tso->sp;
2824   new_sp = (P_)dest + new_tso_size - stack_words;
2825   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2826
2827   /* relocate the stack pointers... */
2828   dest->sp         = new_sp;
2829   dest->stack_size = new_stack_size;
2830         
2831   /* Mark the old TSO as relocated.  We have to check for relocated
2832    * TSOs in the garbage collector and any primops that deal with TSOs.
2833    *
2834    * It's important to set the sp value to just beyond the end
2835    * of the stack, so we don't attempt to scavenge any part of the
2836    * dead TSO's stack.
2837    */
2838   tso->what_next = ThreadRelocated;
2839   tso->link = dest;
2840   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2841   tso->why_blocked = NotBlocked;
2842
2843   IF_PAR_DEBUG(verbose,
2844                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2845                      tso->id, tso, tso->stack_size);
2846                /* If we're debugging, just print out the top of the stack */
2847                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2848                                                 tso->sp+64)));
2849   
2850   IF_DEBUG(sanity,checkTSO(tso));
2851 #if 0
2852   IF_DEBUG(scheduler,printTSO(dest));
2853 #endif
2854
2855   return dest;
2856 }
2857
2858 /* ---------------------------------------------------------------------------
2859    Wake up a queue that was blocked on some resource.
2860    ------------------------------------------------------------------------ */
2861
2862 #if defined(GRAN)
2863 STATIC_INLINE void
2864 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2865 {
2866 }
2867 #elif defined(PARALLEL_HASKELL)
2868 STATIC_INLINE void
2869 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2870 {
2871   /* write RESUME events to log file and
2872      update blocked and fetch time (depending on type of the orig closure) */
2873   if (RtsFlags.ParFlags.ParStats.Full) {
2874     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2875                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2876                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2877     if (EMPTY_RUN_QUEUE())
2878       emitSchedule = rtsTrue;
2879
2880     switch (get_itbl(node)->type) {
2881         case FETCH_ME_BQ:
2882           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2883           break;
2884         case RBH:
2885         case FETCH_ME:
2886         case BLACKHOLE_BQ:
2887           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2888           break;
2889 #ifdef DIST
2890         case MVAR:
2891           break;
2892 #endif    
2893         default:
2894           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2895         }
2896       }
2897 }
2898 #endif
2899
2900 #if defined(GRAN)
2901 static StgBlockingQueueElement *
2902 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2903 {
2904     StgTSO *tso;
2905     PEs node_loc, tso_loc;
2906
2907     node_loc = where_is(node); // should be lifted out of loop
2908     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2909     tso_loc = where_is((StgClosure *)tso);
2910     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2911       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2912       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2913       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2914       // insertThread(tso, node_loc);
2915       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2916                 ResumeThread,
2917                 tso, node, (rtsSpark*)NULL);
2918       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2919       // len_local++;
2920       // len++;
2921     } else { // TSO is remote (actually should be FMBQ)
2922       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2923                                   RtsFlags.GranFlags.Costs.gunblocktime +
2924                                   RtsFlags.GranFlags.Costs.latency;
2925       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2926                 UnblockThread,
2927                 tso, node, (rtsSpark*)NULL);
2928       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2929       // len++;
2930     }
2931     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2932     IF_GRAN_DEBUG(bq,
2933                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2934                           (node_loc==tso_loc ? "Local" : "Global"), 
2935                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2936     tso->block_info.closure = NULL;
2937     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2938                              tso->id, tso));
2939 }
2940 #elif defined(PARALLEL_HASKELL)
2941 static StgBlockingQueueElement *
2942 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2943 {
2944     StgBlockingQueueElement *next;
2945
2946     switch (get_itbl(bqe)->type) {
2947     case TSO:
2948       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2949       /* if it's a TSO just push it onto the run_queue */
2950       next = bqe->link;
2951       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2952       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2953       threadRunnable();
2954       unblockCount(bqe, node);
2955       /* reset blocking status after dumping event */
2956       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2957       break;
2958
2959     case BLOCKED_FETCH:
2960       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2961       next = bqe->link;
2962       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2963       PendingFetches = (StgBlockedFetch *)bqe;
2964       break;
2965
2966 # if defined(DEBUG)
2967       /* can ignore this case in a non-debugging setup; 
2968          see comments on RBHSave closures above */
2969     case CONSTR:
2970       /* check that the closure is an RBHSave closure */
2971       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2972              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2973              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2974       break;
2975
2976     default:
2977       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2978            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2979            (StgClosure *)bqe);
2980 # endif
2981     }
2982   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2983   return next;
2984 }
2985
2986 #else /* !GRAN && !PARALLEL_HASKELL */
2987 static StgTSO *
2988 unblockOneLocked(StgTSO *tso)
2989 {
2990   StgTSO *next;
2991
2992   ASSERT(get_itbl(tso)->type == TSO);
2993   ASSERT(tso->why_blocked != NotBlocked);
2994   tso->why_blocked = NotBlocked;
2995   next = tso->link;
2996   tso->link = END_TSO_QUEUE;
2997   APPEND_TO_RUN_QUEUE(tso);
2998   threadRunnable();
2999   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3000   return next;
3001 }
3002 #endif
3003
3004 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3005 INLINE_ME StgBlockingQueueElement *
3006 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3007 {
3008   ACQUIRE_LOCK(&sched_mutex);
3009   bqe = unblockOneLocked(bqe, node);
3010   RELEASE_LOCK(&sched_mutex);
3011   return bqe;
3012 }
3013 #else
3014 INLINE_ME StgTSO *
3015 unblockOne(StgTSO *tso)
3016 {
3017   ACQUIRE_LOCK(&sched_mutex);
3018   tso = unblockOneLocked(tso);
3019   RELEASE_LOCK(&sched_mutex);
3020   return tso;
3021 }
3022 #endif
3023
3024 #if defined(GRAN)
3025 void 
3026 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3027 {
3028   StgBlockingQueueElement *bqe;
3029   PEs node_loc;
3030   nat len = 0; 
3031
3032   IF_GRAN_DEBUG(bq, 
3033                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3034                       node, CurrentProc, CurrentTime[CurrentProc], 
3035                       CurrentTSO->id, CurrentTSO));
3036
3037   node_loc = where_is(node);
3038
3039   ASSERT(q == END_BQ_QUEUE ||
3040          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3041          get_itbl(q)->type == CONSTR); // closure (type constructor)
3042   ASSERT(is_unique(node));
3043
3044   /* FAKE FETCH: magically copy the node to the tso's proc;
3045      no Fetch necessary because in reality the node should not have been 
3046      moved to the other PE in the first place
3047   */
3048   if (CurrentProc!=node_loc) {
3049     IF_GRAN_DEBUG(bq, 
3050                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3051                         node, node_loc, CurrentProc, CurrentTSO->id, 
3052                         // CurrentTSO, where_is(CurrentTSO),
3053                         node->header.gran.procs));
3054     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3055     IF_GRAN_DEBUG(bq, 
3056                   debugBelch("## new bitmask of node %p is %#x\n",
3057                         node, node->header.gran.procs));
3058     if (RtsFlags.GranFlags.GranSimStats.Global) {
3059       globalGranStats.tot_fake_fetches++;
3060     }
3061   }
3062
3063   bqe = q;
3064   // ToDo: check: ASSERT(CurrentProc==node_loc);
3065   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3066     //next = bqe->link;
3067     /* 
3068        bqe points to the current element in the queue
3069        next points to the next element in the queue
3070     */
3071     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3072     //tso_loc = where_is(tso);
3073     len++;
3074     bqe = unblockOneLocked(bqe, node);
3075   }
3076
3077   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3078      the closure to make room for the anchor of the BQ */
3079   if (bqe!=END_BQ_QUEUE) {
3080     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3081     /*
3082     ASSERT((info_ptr==&RBH_Save_0_info) ||
3083            (info_ptr==&RBH_Save_1_info) ||
3084            (info_ptr==&RBH_Save_2_info));
3085     */
3086     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3087     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3088     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3089
3090     IF_GRAN_DEBUG(bq,
3091                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3092                         node, info_type(node)));
3093   }
3094
3095   /* statistics gathering */
3096   if (RtsFlags.GranFlags.GranSimStats.Global) {
3097     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3098     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3099     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3100     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3101   }
3102   IF_GRAN_DEBUG(bq,
3103                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3104                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3105 }
3106 #elif defined(PARALLEL_HASKELL)
3107 void 
3108 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3109 {
3110   StgBlockingQueueElement *bqe;
3111
3112   ACQUIRE_LOCK(&sched_mutex);
3113
3114   IF_PAR_DEBUG(verbose, 
3115                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3116                      node, mytid));
3117 #ifdef DIST  
3118   //RFP
3119   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3120     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3121     return;
3122   }
3123 #endif
3124   
3125   ASSERT(q == END_BQ_QUEUE ||
3126          get_itbl(q)->type == TSO ||           
3127          get_itbl(q)->type == BLOCKED_FETCH || 
3128          get_itbl(q)->type == CONSTR); 
3129
3130   bqe = q;
3131   while (get_itbl(bqe)->type==TSO || 
3132          get_itbl(bqe)->type==BLOCKED_FETCH) {
3133     bqe = unblockOneLocked(bqe, node);
3134   }
3135   RELEASE_LOCK(&sched_mutex);
3136 }
3137
3138 #else   /* !GRAN && !PARALLEL_HASKELL */
3139
3140 void
3141 awakenBlockedQueueNoLock(StgTSO *tso)
3142 {
3143   while (tso != END_TSO_QUEUE) {
3144     tso = unblockOneLocked(tso);
3145   }
3146 }
3147
3148 void
3149 awakenBlockedQueue(StgTSO *tso)
3150 {
3151   ACQUIRE_LOCK(&sched_mutex);
3152   while (tso != END_TSO_QUEUE) {
3153     tso = unblockOneLocked(tso);
3154   }
3155   RELEASE_LOCK(&sched_mutex);
3156 }
3157 #endif
3158
3159 /* ---------------------------------------------------------------------------
3160    Interrupt execution
3161    - usually called inside a signal handler so it mustn't do anything fancy.   
3162    ------------------------------------------------------------------------ */
3163
3164 void
3165 interruptStgRts(void)
3166 {
3167     interrupted    = 1;
3168     context_switch = 1;
3169 }
3170
3171 /* -----------------------------------------------------------------------------
3172    Unblock a thread
3173
3174    This is for use when we raise an exception in another thread, which
3175    may be blocked.
3176    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3177    -------------------------------------------------------------------------- */
3178
3179 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3180 /*
3181   NB: only the type of the blocking queue is different in GranSim and GUM
3182       the operations on the queue-elements are the same
3183       long live polymorphism!
3184
3185   Locks: sched_mutex is held upon entry and exit.
3186
3187 */
3188 static void
3189 unblockThread(StgTSO *tso)
3190 {
3191   StgBlockingQueueElement *t, **last;
3192
3193   switch (tso->why_blocked) {
3194
3195   case NotBlocked:
3196     return;  /* not blocked */
3197
3198   case BlockedOnSTM:
3199     // Be careful: nothing to do here!  We tell the scheduler that the thread
3200     // is runnable and we leave it to the stack-walking code to abort the 
3201     // transaction while unwinding the stack.  We should perhaps have a debugging
3202     // test to make sure that this really happens and that the 'zombie' transaction
3203     // does not get committed.
3204     goto done;
3205
3206   case BlockedOnMVar:
3207     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3208     {
3209       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3210       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3211
3212       last = (StgBlockingQueueElement **)&mvar->head;
3213       for (t = (StgBlockingQueueElement *)mvar->head; 
3214            t != END_BQ_QUEUE; 
3215            last = &t->link, last_tso = t, t = t->link) {
3216         if (t == (StgBlockingQueueElement *)tso) {
3217           *last = (StgBlockingQueueElement *)tso->link;
3218           if (mvar->tail == tso) {
3219             mvar->tail = (StgTSO *)last_tso;
3220           }
3221           goto done;
3222         }
3223       }
3224       barf("unblockThread (MVAR): TSO not found");
3225     }
3226
3227   case BlockedOnBlackHole:
3228     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3229     {
3230       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3231
3232       last = &bq->blocking_queue;
3233       for (t = bq->blocking_queue; 
3234            t != END_BQ_QUEUE; 
3235            last = &t->link, t = t->link) {
3236         if (t == (StgBlockingQueueElement *)tso) {
3237           *last = (StgBlockingQueueElement *)tso->link;
3238           goto done;
3239         }
3240       }
3241       barf("unblockThread (BLACKHOLE): TSO not found");
3242     }
3243
3244   case BlockedOnException:
3245     {
3246       StgTSO *target  = tso->block_info.tso;
3247
3248       ASSERT(get_itbl(target)->type == TSO);
3249
3250       if (target->what_next == ThreadRelocated) {
3251           target = target->link;
3252           ASSERT(get_itbl(target)->type == TSO);
3253       }
3254
3255       ASSERT(target->blocked_exceptions != NULL);
3256
3257       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3258       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3259            t != END_BQ_QUEUE; 
3260            last = &t->link, t = t->link) {
3261         ASSERT(get_itbl(t)->type == TSO);
3262         if (t == (StgBlockingQueueElement *)tso) {
3263           *last = (StgBlockingQueueElement *)tso->link;
3264           goto done;
3265         }
3266       }
3267       barf("unblockThread (Exception): TSO not found");
3268     }
3269
3270   case BlockedOnRead:
3271   case BlockedOnWrite:
3272 #if defined(mingw32_HOST_OS)
3273   case BlockedOnDoProc:
3274 #endif
3275     {
3276       /* take TSO off blocked_queue */
3277       StgBlockingQueueElement *prev = NULL;
3278       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3279            prev = t, t = t->link) {
3280         if (t == (StgBlockingQueueElement *)tso) {
3281           if (prev == NULL) {
3282             blocked_queue_hd = (StgTSO *)t->link;
3283             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3284               blocked_queue_tl = END_TSO_QUEUE;
3285             }
3286           } else {
3287             prev->link = t->link;
3288             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3289               blocked_queue_tl = (StgTSO *)prev;
3290             }
3291           }
3292           goto done;
3293         }
3294       }
3295       barf("unblockThread (I/O): TSO not found");
3296     }
3297
3298   case BlockedOnDelay:
3299     {
3300       /* take TSO off sleeping_queue */
3301       StgBlockingQueueElement *prev = NULL;
3302       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3303            prev = t, t = t->link) {
3304         if (t == (StgBlockingQueueElement *)tso) {
3305           if (prev == NULL) {
3306             sleeping_queue = (StgTSO *)t->link;
3307           } else {
3308             prev->link = t->link;
3309           }
3310           goto done;
3311         }
3312       }
3313       barf("unblockThread (delay): TSO not found");
3314     }
3315
3316   default:
3317     barf("unblockThread");
3318   }
3319
3320  done:
3321   tso->link = END_TSO_QUEUE;
3322   tso->why_blocked = NotBlocked;
3323   tso->block_info.closure = NULL;
3324   PUSH_ON_RUN_QUEUE(tso);
3325 }
3326 #else
3327 static void
3328 unblockThread(StgTSO *tso)
3329 {
3330   StgTSO *t, **last;
3331   
3332   /* To avoid locking unnecessarily. */
3333   if (tso->why_blocked == NotBlocked) {
3334     return;
3335   }
3336
3337   switch (tso->why_blocked) {
3338
3339   case BlockedOnSTM:
3340     // Be careful: nothing to do here!  We tell the scheduler that the thread
3341     // is runnable and we leave it to the stack-walking code to abort the 
3342     // transaction while unwinding the stack.  We should perhaps have a debugging
3343     // test to make sure that this really happens and that the 'zombie' transaction
3344     // does not get committed.
3345     goto done;
3346
3347   case BlockedOnMVar:
3348     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3349     {
3350       StgTSO *last_tso = END_TSO_QUEUE;
3351       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3352
3353       last = &mvar->head;
3354       for (t = mvar->head; t != END_TSO_QUEUE; 
3355            last = &t->link, last_tso = t, t = t->link) {
3356         if (t == tso) {
3357           *last = tso->link;
3358           if (mvar->tail == tso) {
3359             mvar->tail = last_tso;
3360           }
3361           goto done;
3362         }
3363       }
3364       barf("unblockThread (MVAR): TSO not found");
3365     }
3366
3367   case BlockedOnBlackHole:
3368     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3369     {
3370       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3371
3372       last = &bq->blocking_queue;
3373       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3374            last = &t->link, t = t->link) {
3375         if (t == tso) {
3376           *last = tso->link;
3377           goto done;
3378         }
3379       }
3380       barf("unblockThread (BLACKHOLE): TSO not found");
3381     }
3382
3383   case BlockedOnException:
3384     {
3385       StgTSO *target  = tso->block_info.tso;
3386
3387       ASSERT(get_itbl(target)->type == TSO);
3388
3389       while (target->what_next == ThreadRelocated) {
3390           target = target->link;
3391           ASSERT(get_itbl(target)->type == TSO);
3392       }
3393       
3394       ASSERT(target->blocked_exceptions != NULL);
3395
3396       last = &target->blocked_exceptions;
3397       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3398            last = &t->link, t = t->link) {
3399         ASSERT(get_itbl(t)->type == TSO);
3400         if (t == tso) {
3401           *last = tso->link;
3402           goto done;
3403         }
3404       }
3405       barf("unblockThread (Exception): TSO not found");
3406     }
3407
3408   case BlockedOnRead:
3409   case BlockedOnWrite:
3410 #if defined(mingw32_HOST_OS)
3411   case BlockedOnDoProc:
3412 #endif
3413     {
3414       StgTSO *prev = NULL;
3415       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3416            prev = t, t = t->link) {
3417         if (t == tso) {
3418           if (prev == NULL) {
3419             blocked_queue_hd = t->link;
3420             if (blocked_queue_tl == t) {
3421               blocked_queue_tl = END_TSO_QUEUE;
3422             }
3423           } else {
3424             prev->link = t->link;
3425             if (blocked_queue_tl == t) {
3426               blocked_queue_tl = prev;
3427             }
3428           }
3429           goto done;
3430         }
3431       }
3432       barf("unblockThread (I/O): TSO not found");
3433     }
3434
3435   case BlockedOnDelay:
3436     {
3437       StgTSO *prev = NULL;
3438       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3439            prev = t, t = t->link) {
3440         if (t == tso) {
3441           if (prev == NULL) {
3442             sleeping_queue = t->link;
3443           } else {
3444             prev->link = t->link;
3445           }
3446           goto done;
3447         }
3448       }
3449       barf("unblockThread (delay): TSO not found");
3450     }
3451
3452   default:
3453     barf("unblockThread");
3454   }
3455
3456  done:
3457   tso->link = END_TSO_QUEUE;
3458   tso->why_blocked = NotBlocked;
3459   tso->block_info.closure = NULL;
3460   APPEND_TO_RUN_QUEUE(tso);
3461 }
3462 #endif
3463
3464 /* -----------------------------------------------------------------------------
3465  * raiseAsync()
3466  *
3467  * The following function implements the magic for raising an
3468  * asynchronous exception in an existing thread.
3469  *
3470  * We first remove the thread from any queue on which it might be
3471  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3472  *
3473  * We strip the stack down to the innermost CATCH_FRAME, building
3474  * thunks in the heap for all the active computations, so they can 
3475  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3476  * an application of the handler to the exception, and push it on
3477  * the top of the stack.
3478  * 
3479  * How exactly do we save all the active computations?  We create an
3480  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3481  * AP_STACKs pushes everything from the corresponding update frame
3482  * upwards onto the stack.  (Actually, it pushes everything up to the
3483  * next update frame plus a pointer to the next AP_STACK object.
3484  * Entering the next AP_STACK object pushes more onto the stack until we
3485  * reach the last AP_STACK object - at which point the stack should look
3486  * exactly as it did when we killed the TSO and we can continue
3487  * execution by entering the closure on top of the stack.
3488  *
3489  * We can also kill a thread entirely - this happens if either (a) the 
3490  * exception passed to raiseAsync is NULL, or (b) there's no
3491  * CATCH_FRAME on the stack.  In either case, we strip the entire
3492  * stack and replace the thread with a zombie.
3493  *
3494  * Locks: sched_mutex held upon entry nor exit.
3495  *
3496  * -------------------------------------------------------------------------- */
3497  
3498 void 
3499 deleteThread(StgTSO *tso)
3500 {
3501   if (tso->why_blocked != BlockedOnCCall &&
3502       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3503       raiseAsync(tso,NULL);
3504   }
3505 }
3506
3507 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3508 static void 
3509 deleteThreadImmediately(StgTSO *tso)
3510 { // for forkProcess only:
3511   // delete thread without giving it a chance to catch the KillThread exception
3512
3513   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3514       return;
3515   }
3516
3517   if (tso->why_blocked != BlockedOnCCall &&
3518       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3519     unblockThread(tso);
3520   }
3521
3522   tso->what_next = ThreadKilled;
3523 }
3524 #endif
3525
3526 void
3527 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3528 {
3529   /* When raising async exs from contexts where sched_mutex isn't held;
3530      use raiseAsyncWithLock(). */
3531   ACQUIRE_LOCK(&sched_mutex);
3532   raiseAsync(tso,exception);
3533   RELEASE_LOCK(&sched_mutex);
3534 }
3535
3536 void
3537 raiseAsync(StgTSO *tso, StgClosure *exception)
3538 {
3539     raiseAsync_(tso, exception, rtsFalse);
3540 }
3541
3542 static void
3543 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3544 {
3545     StgRetInfoTable *info;
3546     StgPtr sp;
3547   
3548     // Thread already dead?
3549     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3550         return;
3551     }
3552
3553     IF_DEBUG(scheduler, 
3554              sched_belch("raising exception in thread %ld.", (long)tso->id));
3555     
3556     // Remove it from any blocking queues
3557     unblockThread(tso);
3558
3559     sp = tso->sp;
3560     
3561     // The stack freezing code assumes there's a closure pointer on
3562     // the top of the stack, so we have to arrange that this is the case...
3563     //
3564     if (sp[0] == (W_)&stg_enter_info) {
3565         sp++;
3566     } else {
3567         sp--;
3568         sp[0] = (W_)&stg_dummy_ret_closure;
3569     }
3570
3571     while (1) {
3572         nat i;
3573
3574         // 1. Let the top of the stack be the "current closure"
3575         //
3576         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3577         // CATCH_FRAME.
3578         //
3579         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3580         // current closure applied to the chunk of stack up to (but not
3581         // including) the update frame.  This closure becomes the "current
3582         // closure".  Go back to step 2.
3583         //
3584         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3585         // top of the stack applied to the exception.
3586         // 
3587         // 5. If it's a STOP_FRAME, then kill the thread.
3588         // 
3589         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3590         // transaction
3591        
3592         
3593         StgPtr frame;
3594         
3595         frame = sp + 1;
3596         info = get_ret_itbl((StgClosure *)frame);
3597         
3598         while (info->i.type != UPDATE_FRAME
3599                && (info->i.type != CATCH_FRAME || exception == NULL)
3600                && info->i.type != STOP_FRAME
3601                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3602         {
3603             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3604               // IF we find an ATOMICALLY_FRAME then we abort the
3605               // current transaction and propagate the exception.  In
3606               // this case (unlike ordinary exceptions) we do not care
3607               // whether the transaction is valid or not because its
3608               // possible validity cannot have caused the exception
3609               // and will not be visible after the abort.
3610               IF_DEBUG(stm,
3611                        debugBelch("Found atomically block delivering async exception\n"));
3612               stmAbortTransaction(tso -> trec);
3613               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3614             }
3615             frame += stack_frame_sizeW((StgClosure *)frame);
3616             info = get_ret_itbl((StgClosure *)frame);
3617         }
3618         
3619         switch (info->i.type) {
3620             
3621         case ATOMICALLY_FRAME:
3622             ASSERT(stop_at_atomically);
3623             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3624             stmCondemnTransaction(tso -> trec);
3625 #ifdef REG_R1
3626             tso->sp = frame;
3627 #else
3628             // R1 is not a register: the return convention for IO in
3629             // this case puts the return value on the stack, so we
3630             // need to set up the stack to return to the atomically
3631             // frame properly...
3632             tso->sp = frame - 2;
3633             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3634             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3635 #endif
3636             tso->what_next = ThreadRunGHC;
3637             return;
3638
3639         case CATCH_FRAME:
3640             // If we find a CATCH_FRAME, and we've got an exception to raise,
3641             // then build the THUNK raise(exception), and leave it on
3642             // top of the CATCH_FRAME ready to enter.
3643             //
3644         {
3645 #ifdef PROFILING
3646             StgCatchFrame *cf = (StgCatchFrame *)frame;
3647 #endif
3648             StgClosure *raise;
3649             
3650             // we've got an exception to raise, so let's pass it to the
3651             // handler in this frame.
3652             //
3653             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3654             TICK_ALLOC_SE_THK(1,0);
3655             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3656             raise->payload[0] = exception;
3657             
3658             // throw away the stack from Sp up to the CATCH_FRAME.
3659             //
3660             sp = frame - 1;
3661             
3662             /* Ensure that async excpetions are blocked now, so we don't get
3663              * a surprise exception before we get around to executing the
3664              * handler.
3665              */
3666             if (tso->blocked_exceptions == NULL) {
3667                 tso->blocked_exceptions = END_TSO_QUEUE;
3668             }
3669             
3670             /* Put the newly-built THUNK on top of the stack, ready to execute
3671              * when the thread restarts.
3672              */
3673             sp[0] = (W_)raise;
3674             sp[-1] = (W_)&stg_enter_info;
3675             tso->sp = sp-1;
3676             tso->what_next = ThreadRunGHC;
3677             IF_DEBUG(sanity, checkTSO(tso));
3678             return;
3679         }
3680         
3681         case UPDATE_FRAME:
3682         {
3683             StgAP_STACK * ap;
3684             nat words;
3685             
3686             // First build an AP_STACK consisting of the stack chunk above the
3687             // current update frame, with the top word on the stack as the
3688             // fun field.
3689             //
3690             words = frame - sp - 1;
3691             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3692             
3693             ap->size = words;
3694             ap->fun  = (StgClosure *)sp[0];
3695             sp++;
3696             for(i=0; i < (nat)words; ++i) {
3697                 ap->payload[i] = (StgClosure *)*sp++;
3698             }
3699             
3700             SET_HDR(ap,&stg_AP_STACK_info,
3701                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3702             TICK_ALLOC_UP_THK(words+1,0);
3703             
3704             IF_DEBUG(scheduler,
3705                      debugBelch("sched: Updating ");
3706                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3707                      debugBelch(" with ");
3708                      printObj((StgClosure *)ap);
3709                 );
3710
3711             // Replace the updatee with an indirection - happily
3712             // this will also wake up any threads currently
3713             // waiting on the result.
3714             //
3715             // Warning: if we're in a loop, more than one update frame on
3716             // the stack may point to the same object.  Be careful not to
3717             // overwrite an IND_OLDGEN in this case, because we'll screw
3718             // up the mutable lists.  To be on the safe side, don't
3719             // overwrite any kind of indirection at all.  See also
3720             // threadSqueezeStack in GC.c, where we have to make a similar
3721             // check.
3722             //
3723             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3724                 // revert the black hole
3725                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3726                                (StgClosure *)ap);
3727             }
3728             sp += sizeofW(StgUpdateFrame) - 1;
3729             sp[0] = (W_)ap; // push onto stack
3730             break;
3731         }
3732         
3733         case STOP_FRAME:
3734             // We've stripped the entire stack, the thread is now dead.
3735             sp += sizeofW(StgStopFrame);
3736             tso->what_next = ThreadKilled;
3737             tso->sp = sp;
3738             return;
3739             
3740         default:
3741             barf("raiseAsync");
3742         }
3743     }
3744     barf("raiseAsync");
3745 }
3746
3747 /* -----------------------------------------------------------------------------
3748    raiseExceptionHelper
3749    
3750    This function is called by the raise# primitve, just so that we can
3751    move some of the tricky bits of raising an exception from C-- into
3752    C.  Who knows, it might be a useful re-useable thing here too.
3753    -------------------------------------------------------------------------- */
3754
3755 StgWord
3756 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3757 {
3758     StgClosure *raise_closure = NULL;
3759     StgPtr p, next;
3760     StgRetInfoTable *info;
3761     //
3762     // This closure represents the expression 'raise# E' where E
3763     // is the exception raise.  It is used to overwrite all the
3764     // thunks which are currently under evaluataion.
3765     //
3766
3767     //    
3768     // LDV profiling: stg_raise_info has THUNK as its closure
3769     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3770     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3771     // 1 does not cause any problem unless profiling is performed.
3772     // However, when LDV profiling goes on, we need to linearly scan
3773     // small object pool, where raise_closure is stored, so we should
3774     // use MIN_UPD_SIZE.
3775     //
3776     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3777     //                                 sizeofW(StgClosure)+1);
3778     //
3779
3780     //
3781     // Walk up the stack, looking for the catch frame.  On the way,
3782     // we update any closures pointed to from update frames with the
3783     // raise closure that we just built.
3784     //
3785     p = tso->sp;
3786     while(1) {
3787         info = get_ret_itbl((StgClosure *)p);
3788         next = p + stack_frame_sizeW((StgClosure *)p);
3789         switch (info->i.type) {
3790             
3791         case UPDATE_FRAME:
3792             // Only create raise_closure if we need to.
3793             if (raise_closure == NULL) {
3794                 raise_closure = 
3795                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3796                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3797                 raise_closure->payload[0] = exception;
3798             }
3799             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3800             p = next;
3801             continue;
3802
3803         case ATOMICALLY_FRAME:
3804             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3805             tso->sp = p;
3806             return ATOMICALLY_FRAME;
3807             
3808         case CATCH_FRAME:
3809             tso->sp = p;
3810             return CATCH_FRAME;
3811
3812         case CATCH_STM_FRAME:
3813             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3814             tso->sp = p;
3815             return CATCH_STM_FRAME;
3816             
3817         case STOP_FRAME:
3818             tso->sp = p;
3819             return STOP_FRAME;
3820
3821         case CATCH_RETRY_FRAME:
3822         default:
3823             p = next; 
3824             continue;
3825         }
3826     }
3827 }
3828
3829
3830 /* -----------------------------------------------------------------------------
3831    findRetryFrameHelper
3832
3833    This function is called by the retry# primitive.  It traverses the stack
3834    leaving tso->sp referring to the frame which should handle the retry.  
3835
3836    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3837    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3838
3839    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3840    despite the similar implementation.
3841
3842    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3843    not be created within memory transactions.
3844    -------------------------------------------------------------------------- */
3845
3846 StgWord
3847 findRetryFrameHelper (StgTSO *tso)
3848 {
3849   StgPtr           p, next;
3850   StgRetInfoTable *info;
3851
3852   p = tso -> sp;
3853   while (1) {
3854     info = get_ret_itbl((StgClosure *)p);
3855     next = p + stack_frame_sizeW((StgClosure *)p);
3856     switch (info->i.type) {
3857       
3858     case ATOMICALLY_FRAME:
3859       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3860       tso->sp = p;
3861       return ATOMICALLY_FRAME;
3862       
3863     case CATCH_RETRY_FRAME:
3864       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3865       tso->sp = p;
3866       return CATCH_RETRY_FRAME;
3867       
3868     case CATCH_STM_FRAME:
3869     default:
3870       ASSERT(info->i.type != CATCH_FRAME);
3871       ASSERT(info->i.type != STOP_FRAME);
3872       p = next; 
3873       continue;
3874     }
3875   }
3876 }
3877
3878 /* -----------------------------------------------------------------------------
3879    resurrectThreads is called after garbage collection on the list of
3880    threads found to be garbage.  Each of these threads will be woken
3881    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3882    on an MVar, or NonTermination if the thread was blocked on a Black
3883    Hole.
3884
3885    Locks: sched_mutex isn't held upon entry nor exit.
3886    -------------------------------------------------------------------------- */
3887
3888 void
3889 resurrectThreads( StgTSO *threads )
3890 {
3891   StgTSO *tso, *next;
3892
3893   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3894     next = tso->global_link;
3895     tso->global_link = all_threads;
3896     all_threads = tso;
3897     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3898
3899     switch (tso->why_blocked) {
3900     case BlockedOnMVar:
3901     case BlockedOnException:
3902       /* Called by GC - sched_mutex lock is currently held. */
3903       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3904       break;
3905     case BlockedOnBlackHole:
3906       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3907       break;
3908     case BlockedOnSTM:
3909       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3910       break;
3911     case NotBlocked:
3912       /* This might happen if the thread was blocked on a black hole
3913        * belonging to a thread that we've just woken up (raiseAsync
3914        * can wake up threads, remember...).
3915        */
3916       continue;
3917     default:
3918       barf("resurrectThreads: thread blocked in a strange way");
3919     }
3920   }
3921 }
3922
3923 /* ----------------------------------------------------------------------------
3924  * Debugging: why is a thread blocked
3925  * [Also provides useful information when debugging threaded programs
3926  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3927    ------------------------------------------------------------------------- */
3928
3929 static void
3930 printThreadBlockage(StgTSO *tso)
3931 {
3932   switch (tso->why_blocked) {
3933   case BlockedOnRead:
3934     debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
3935     break;
3936   case BlockedOnWrite:
3937     debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
3938     break;
3939 #if defined(mingw32_HOST_OS)
3940     case BlockedOnDoProc:
3941     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
3942     break;
3943 #endif
3944   case BlockedOnDelay:
3945     debugBelch("is blocked until %ld", tso->block_info.target);
3946     break;
3947   case BlockedOnMVar:
3948     debugBelch("is blocked on an MVar");
3949     break;
3950   case BlockedOnException:
3951     debugBelch("is blocked on delivering an exception to thread %d",
3952             tso->block_info.tso->id);
3953     break;
3954   case BlockedOnBlackHole:
3955     debugBelch("is blocked on a black hole");
3956     break;
3957   case NotBlocked:
3958     debugBelch("is not blocked");
3959     break;
3960 #if defined(PARALLEL_HASKELL)
3961   case BlockedOnGA:
3962     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3963             tso->block_info.closure, info_type(tso->block_info.closure));
3964     break;
3965   case BlockedOnGA_NoSend:
3966     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3967             tso->block_info.closure, info_type(tso->block_info.closure));
3968     break;
3969 #endif
3970   case BlockedOnCCall:
3971     debugBelch("is blocked on an external call");
3972     break;
3973   case BlockedOnCCall_NoUnblockExc:
3974     debugBelch("is blocked on an external call (exceptions were already blocked)");
3975     break;
3976   case BlockedOnSTM:
3977     debugBelch("is blocked on an STM operation");
3978     break;
3979   default:
3980     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3981          tso->why_blocked, tso->id, tso);
3982   }
3983 }
3984
3985 static void
3986 printThreadStatus(StgTSO *tso)
3987 {
3988   switch (tso->what_next) {
3989   case ThreadKilled:
3990     debugBelch("has been killed");
3991     break;
3992   case ThreadComplete:
3993     debugBelch("has completed");
3994     break;
3995   default:
3996     printThreadBlockage(tso);
3997   }
3998 }
3999
4000 void
4001 printAllThreads(void)
4002 {
4003   StgTSO *t;
4004
4005 # if defined(GRAN)
4006   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4007   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4008                        time_string, rtsFalse/*no commas!*/);
4009
4010   debugBelch("all threads at [%s]:\n", time_string);
4011 # elif defined(PARALLEL_HASKELL)
4012   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4013   ullong_format_string(CURRENT_TIME,
4014                        time_string, rtsFalse/*no commas!*/);
4015
4016   debugBelch("all threads at [%s]:\n", time_string);
4017 # else
4018   debugBelch("all threads:\n");
4019 # endif
4020
4021   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
4022     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
4023 #if defined(DEBUG)
4024     {
4025       void *label = lookupThreadLabel(t->id);
4026       if (label) debugBelch("[\"%s\"] ",(char *)label);
4027     }
4028 #endif
4029     printThreadStatus(t);
4030     debugBelch("\n");
4031   }
4032 }
4033     
4034 #ifdef DEBUG
4035
4036 /* 
4037    Print a whole blocking queue attached to node (debugging only).
4038 */
4039 # if defined(PARALLEL_HASKELL)
4040 void 
4041 print_bq (StgClosure *node)
4042 {
4043   StgBlockingQueueElement *bqe;
4044   StgTSO *tso;
4045   rtsBool end;
4046
4047   debugBelch("## BQ of closure %p (%s): ",
4048           node, info_type(node));
4049
4050   /* should cover all closures that may have a blocking queue */
4051   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4052          get_itbl(node)->type == FETCH_ME_BQ ||
4053          get_itbl(node)->type == RBH ||
4054          get_itbl(node)->type == MVAR);
4055     
4056   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4057
4058   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4059 }
4060
4061 /* 
4062    Print a whole blocking queue starting with the element bqe.
4063 */
4064 void 
4065 print_bqe (StgBlockingQueueElement *bqe)
4066 {
4067   rtsBool end;
4068
4069   /* 
4070      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4071   */
4072   for (end = (bqe==END_BQ_QUEUE);
4073        !end; // iterate until bqe points to a CONSTR
4074        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4075        bqe = end ? END_BQ_QUEUE : bqe->link) {
4076     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4077     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4078     /* types of closures that may appear in a blocking queue */
4079     ASSERT(get_itbl(bqe)->type == TSO ||           
4080            get_itbl(bqe)->type == BLOCKED_FETCH || 
4081            get_itbl(bqe)->type == CONSTR); 
4082     /* only BQs of an RBH end with an RBH_Save closure */
4083     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4084
4085     switch (get_itbl(bqe)->type) {
4086     case TSO:
4087       debugBelch(" TSO %u (%x),",
4088               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4089       break;
4090     case BLOCKED_FETCH:
4091       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4092               ((StgBlockedFetch *)bqe)->node, 
4093               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4094               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4095               ((StgBlockedFetch *)bqe)->ga.weight);
4096       break;
4097     case CONSTR:
4098       debugBelch(" %s (IP %p),",
4099               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4100                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4101                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4102                "RBH_Save_?"), get_itbl(bqe));
4103       break;
4104     default:
4105       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4106            info_type((StgClosure *)bqe)); // , node, info_type(node));
4107       break;
4108     }
4109   } /* for */
4110   debugBelch("\n");
4111 }
4112 # elif defined(GRAN)
4113 void 
4114 print_bq (StgClosure *node)
4115 {
4116   StgBlockingQueueElement *bqe;
4117   PEs node_loc, tso_loc;
4118   rtsBool end;
4119
4120   /* should cover all closures that may have a blocking queue */
4121   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4122          get_itbl(node)->type == FETCH_ME_BQ ||
4123          get_itbl(node)->type == RBH);
4124     
4125   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4126   node_loc = where_is(node);
4127
4128   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4129           node, info_type(node), node_loc);
4130
4131   /* 
4132      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4133   */
4134   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4135        !end; // iterate until bqe points to a CONSTR
4136        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4137     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4138     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4139     /* types of closures that may appear in a blocking queue */
4140     ASSERT(get_itbl(bqe)->type == TSO ||           
4141            get_itbl(bqe)->type == CONSTR); 
4142     /* only BQs of an RBH end with an RBH_Save closure */
4143     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4144
4145     tso_loc = where_is((StgClosure *)bqe);
4146     switch (get_itbl(bqe)->type) {
4147     case TSO:
4148       debugBelch(" TSO %d (%p) on [PE %d],",
4149               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4150       break;
4151     case CONSTR:
4152       debugBelch(" %s (IP %p),",
4153               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4154                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4155                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4156                "RBH_Save_?"), get_itbl(bqe));
4157       break;
4158     default:
4159       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4160            info_type((StgClosure *)bqe), node, info_type(node));
4161       break;
4162     }
4163   } /* for */
4164   debugBelch("\n");
4165 }
4166 #else
4167 /* 
4168    Nice and easy: only TSOs on the blocking queue
4169 */
4170 void 
4171 print_bq (StgClosure *node)
4172 {
4173   StgTSO *tso;
4174
4175   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4176   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
4177        tso != END_TSO_QUEUE; 
4178        tso=tso->link) {
4179     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
4180     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
4181     debugBelch(" TSO %d (%p),", tso->id, tso);
4182   }
4183   debugBelch("\n");
4184 }
4185 # endif
4186
4187 #if defined(PARALLEL_HASKELL)
4188 static nat
4189 run_queue_len(void)
4190 {
4191   nat i;
4192   StgTSO *tso;
4193
4194   for (i=0, tso=run_queue_hd; 
4195        tso != END_TSO_QUEUE;
4196        i++, tso=tso->link)
4197     /* nothing */
4198
4199   return i;
4200 }
4201 #endif
4202
4203 void
4204 sched_belch(char *s, ...)
4205 {
4206   va_list ap;
4207   va_start(ap,s);
4208 #ifdef RTS_SUPPORTS_THREADS
4209   debugBelch("sched (task %p): ", osThreadId());
4210 #elif defined(PARALLEL_HASKELL)
4211   debugBelch("== ");
4212 #else
4213   debugBelch("sched: ");
4214 #endif
4215   vdebugBelch(s, ap);
4216   debugBelch("\n");
4217   va_end(ap);
4218 }
4219
4220 #endif /* DEBUG */