[project @ 2005-04-05 12:19:54 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PARALLEL_HASKELL          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "OSThreads.h"
46 #include "Storage.h"
47 #include "StgRun.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PARALLEL_HASKELL)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 // Turn off inlining when debugging - it obfuscates things
97 #ifdef DEBUG
98 # undef  STATIC_INLINE
99 # define STATIC_INLINE static
100 #endif
101
102 #ifdef THREADED_RTS
103 #define USED_IN_THREADED_RTS
104 #else
105 #define USED_IN_THREADED_RTS STG_UNUSED
106 #endif
107
108 #ifdef RTS_SUPPORTS_THREADS
109 #define USED_WHEN_RTS_SUPPORTS_THREADS
110 #else
111 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
112 #endif
113
114 /* Main thread queue.
115  * Locks required: sched_mutex.
116  */
117 StgMainThread *main_threads = NULL;
118
119 #if defined(GRAN)
120
121 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
122 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
123
124 /* 
125    In GranSim we have a runnable and a blocked queue for each processor.
126    In order to minimise code changes new arrays run_queue_hds/tls
127    are created. run_queue_hd is then a short cut (macro) for
128    run_queue_hds[CurrentProc] (see GranSim.h).
129    -- HWL
130 */
131 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
132 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
133 StgTSO *ccalling_threadss[MAX_PROC];
134 /* We use the same global list of threads (all_threads) in GranSim as in
135    the std RTS (i.e. we are cheating). However, we don't use this list in
136    the GranSim specific code at the moment (so we are only potentially
137    cheating).  */
138
139 #else /* !GRAN */
140
141 /* Thread queues.
142  * Locks required: sched_mutex.
143  */
144 StgTSO *run_queue_hd = NULL;
145 StgTSO *run_queue_tl = NULL;
146 StgTSO *blocked_queue_hd = NULL;
147 StgTSO *blocked_queue_tl = NULL;
148 StgTSO *blackhole_queue = NULL;
149 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
150
151 #endif
152
153 /* The blackhole_queue should be checked for threads to wake up.  See
154  * Schedule.h for more thorough comment.
155  */
156 rtsBool blackholes_need_checking = rtsFalse;
157
158 /* Linked list of all threads.
159  * Used for detecting garbage collected threads.
160  */
161 StgTSO *all_threads = NULL;
162
163 /* When a thread performs a safe C call (_ccall_GC, using old
164  * terminology), it gets put on the suspended_ccalling_threads
165  * list. Used by the garbage collector.
166  */
167 static StgTSO *suspended_ccalling_threads;
168
169 /* KH: The following two flags are shared memory locations.  There is no need
170        to lock them, since they are only unset at the end of a scheduler
171        operation.
172 */
173
174 /* flag set by signal handler to precipitate a context switch */
175 int context_switch = 0;
176
177 /* if this flag is set as well, give up execution */
178 rtsBool interrupted = rtsFalse;
179
180 /* If this flag is set, we are running Haskell code.  Used to detect
181  * uses of 'foreign import unsafe' that should be 'safe'.
182  */
183 static rtsBool in_haskell = rtsFalse;
184
185 /* Next thread ID to allocate.
186  * Locks required: thread_id_mutex
187  */
188 static StgThreadID next_thread_id = 1;
189
190 /*
191  * Pointers to the state of the current thread.
192  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
193  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
194  */
195  
196 /* The smallest stack size that makes any sense is:
197  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
198  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
199  *  + 1                       (the closure to enter)
200  *  + 1                       (stg_ap_v_ret)
201  *  + 1                       (spare slot req'd by stg_ap_v_ret)
202  *
203  * A thread with this stack will bomb immediately with a stack
204  * overflow, which will increase its stack size.  
205  */
206
207 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
208
209
210 #if defined(GRAN)
211 StgTSO *CurrentTSO;
212 #endif
213
214 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
215  *  exists - earlier gccs apparently didn't.
216  *  -= chak
217  */
218 StgTSO dummy_tso;
219
220 # if defined(SMP)
221 static Condition gc_pending_cond = INIT_COND_VAR;
222 # endif
223
224 static rtsBool ready_to_gc;
225
226 /*
227  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
228  * in an MT setting, needed to signal that a worker thread shouldn't hang around
229  * in the scheduler when it is out of work.
230  */
231 static rtsBool shutting_down_scheduler = rtsFalse;
232
233 #if defined(RTS_SUPPORTS_THREADS)
234 /* ToDo: carefully document the invariants that go together
235  *       with these synchronisation objects.
236  */
237 Mutex     sched_mutex       = INIT_MUTEX_VAR;
238 Mutex     term_mutex        = INIT_MUTEX_VAR;
239
240 #endif /* RTS_SUPPORTS_THREADS */
241
242 #if defined(PARALLEL_HASKELL)
243 StgTSO *LastTSO;
244 rtsTime TimeOfLastYield;
245 rtsBool emitSchedule = rtsTrue;
246 #endif
247
248 #if DEBUG
249 static char *whatNext_strs[] = {
250   "(unknown)",
251   "ThreadRunGHC",
252   "ThreadInterpret",
253   "ThreadKilled",
254   "ThreadRelocated",
255   "ThreadComplete"
256 };
257 #endif
258
259 /* -----------------------------------------------------------------------------
260  * static function prototypes
261  * -------------------------------------------------------------------------- */
262
263 #if defined(RTS_SUPPORTS_THREADS)
264 static void taskStart(void);
265 #endif
266
267 static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
268                       Capability *initialCapability );
269
270 //
271 // These function all encapsulate parts of the scheduler loop, and are
272 // abstracted only to make the structure and control flow of the
273 // scheduler clearer.
274 //
275 static void schedulePreLoop(void);
276 static void scheduleHandleInterrupt(void);
277 static void scheduleStartSignalHandlers(void);
278 static void scheduleCheckBlockedThreads(void);
279 static void scheduleCheckBlackHoles(void);
280 static void scheduleDetectDeadlock(void);
281 #if defined(GRAN)
282 static StgTSO *scheduleProcessEvent(rtsEvent *event);
283 #endif
284 #if defined(PARALLEL_HASKELL)
285 static StgTSO *scheduleSendPendingMessages(void);
286 static void scheduleActivateSpark(void);
287 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
288 #endif
289 #if defined(PAR) || defined(GRAN)
290 static void scheduleGranParReport(void);
291 #endif
292 static void schedulePostRunThread(void);
293 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
294 static void scheduleHandleStackOverflow( StgTSO *t);
295 static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
296 static void scheduleHandleThreadBlocked( StgTSO *t );
297 static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, 
298                                              Capability *cap, StgTSO *t );
299 static void scheduleDoHeapProfile(void);
300 static void scheduleDoGC(void);
301
302 static void unblockThread(StgTSO *tso);
303 static rtsBool checkBlackHoles(void);
304 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
305                                    Capability *initialCapability
306                                    );
307 static void scheduleThread_ (StgTSO* tso);
308 static void AllRoots(evac_fn evac);
309
310 static StgTSO *threadStackOverflow(StgTSO *tso);
311
312 static void raiseAsync_(StgTSO *tso, StgClosure *exception, 
313                         rtsBool stop_at_atomically);
314
315 static void printThreadBlockage(StgTSO *tso);
316 static void printThreadStatus(StgTSO *tso);
317
318 #if defined(PARALLEL_HASKELL)
319 StgTSO * createSparkThread(rtsSpark spark);
320 StgTSO * activateSpark (rtsSpark spark);  
321 #endif
322
323 /* ----------------------------------------------------------------------------
324  * Starting Tasks
325  * ------------------------------------------------------------------------- */
326
327 #if defined(RTS_SUPPORTS_THREADS)
328 static rtsBool startingWorkerThread = rtsFalse;
329
330 static void
331 taskStart(void)
332 {
333   ACQUIRE_LOCK(&sched_mutex);
334   startingWorkerThread = rtsFalse;
335   schedule(NULL,NULL);
336   RELEASE_LOCK(&sched_mutex);
337 }
338
339 void
340 startSchedulerTaskIfNecessary(void)
341 {
342   if(run_queue_hd != END_TSO_QUEUE
343     || blocked_queue_hd != END_TSO_QUEUE
344     || sleeping_queue != END_TSO_QUEUE)
345   {
346     if(!startingWorkerThread)
347     { // we don't want to start another worker thread
348       // just because the last one hasn't yet reached the
349       // "waiting for capability" state
350       startingWorkerThread = rtsTrue;
351       if (!startTask(taskStart)) {
352           startingWorkerThread = rtsFalse;
353       }
354     }
355   }
356 }
357 #endif
358
359 /* -----------------------------------------------------------------------------
360  * Putting a thread on the run queue: different scheduling policies
361  * -------------------------------------------------------------------------- */
362
363 STATIC_INLINE void
364 addToRunQueue( StgTSO *t )
365 {
366 #if defined(PARALLEL_HASKELL)
367     if (RtsFlags.ParFlags.doFairScheduling) { 
368         // this does round-robin scheduling; good for concurrency
369         APPEND_TO_RUN_QUEUE(t);
370     } else {
371         // this does unfair scheduling; good for parallelism
372         PUSH_ON_RUN_QUEUE(t);
373     }
374 #else
375     // this does round-robin scheduling; good for concurrency
376     APPEND_TO_RUN_QUEUE(t);
377 #endif
378 }
379     
380 /* ---------------------------------------------------------------------------
381    Main scheduling loop.
382
383    We use round-robin scheduling, each thread returning to the
384    scheduler loop when one of these conditions is detected:
385
386       * out of heap space
387       * timer expires (thread yields)
388       * thread blocks
389       * thread ends
390       * stack overflow
391
392    Locking notes:  we acquire the scheduler lock once at the beginning
393    of the scheduler loop, and release it when
394     
395       * running a thread, or
396       * waiting for work, or
397       * waiting for a GC to complete.
398
399    GRAN version:
400      In a GranSim setup this loop iterates over the global event queue.
401      This revolves around the global event queue, which determines what 
402      to do next. Therefore, it's more complicated than either the 
403      concurrent or the parallel (GUM) setup.
404
405    GUM version:
406      GUM iterates over incoming messages.
407      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
408      and sends out a fish whenever it has nothing to do; in-between
409      doing the actual reductions (shared code below) it processes the
410      incoming messages and deals with delayed operations 
411      (see PendingFetches).
412      This is not the ugliest code you could imagine, but it's bloody close.
413
414    ------------------------------------------------------------------------ */
415
416 static void
417 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
418           Capability *initialCapability )
419 {
420   StgTSO *t;
421   Capability *cap;
422   StgThreadReturnCode ret;
423 #if defined(GRAN)
424   rtsEvent *event;
425 #elif defined(PARALLEL_HASKELL)
426   StgTSO *tso;
427   GlobalTaskId pe;
428   rtsBool receivedFinish = rtsFalse;
429 # if defined(DEBUG)
430   nat tp_size, sp_size; // stats only
431 # endif
432 #endif
433   nat prev_what_next;
434   
435   // Pre-condition: sched_mutex is held.
436   // We might have a capability, passed in as initialCapability.
437   cap = initialCapability;
438
439 #if !defined(RTS_SUPPORTS_THREADS)
440   // simply initialise it in the non-threaded case
441   grabCapability(&cap);
442 #endif
443
444   IF_DEBUG(scheduler,
445            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
446                        mainThread, initialCapability);
447       );
448
449   schedulePreLoop();
450
451   // -----------------------------------------------------------
452   // Scheduler loop starts here:
453
454 #if defined(PARALLEL_HASKELL)
455 #define TERMINATION_CONDITION        (!receivedFinish)
456 #elif defined(GRAN)
457 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
458 #else
459 #define TERMINATION_CONDITION        rtsTrue
460 #endif
461
462   while (TERMINATION_CONDITION) {
463
464 #if defined(GRAN)
465       /* Choose the processor with the next event */
466       CurrentProc = event->proc;
467       CurrentTSO = event->tso;
468 #endif
469
470       IF_DEBUG(scheduler, printAllThreads());
471
472 #if defined(SMP)
473       // 
474       // Wait until GC has completed, if necessary.
475       //
476       if (ready_to_gc) {
477           if (cap != NULL) {
478               releaseCapability(cap);
479               IF_DEBUG(scheduler,sched_belch("waiting for GC"));
480               waitCondition( &gc_pending_cond, &sched_mutex );
481           }
482       }
483 #endif
484
485 #if defined(RTS_SUPPORTS_THREADS)
486       // Yield the capability to higher-priority tasks if necessary.
487       //
488       if (cap != NULL) {
489           yieldCapability(&cap);
490       }
491
492       // If we do not currently hold a capability, we wait for one
493       //
494       if (cap == NULL) {
495           waitForCapability(&sched_mutex, &cap,
496                             mainThread ? &mainThread->bound_thread_cond : NULL);
497       }
498
499       // We now have a capability...
500 #endif
501
502     // Check whether we have re-entered the RTS from Haskell without
503     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
504     // call).
505     if (in_haskell) {
506           errorBelch("schedule: re-entered unsafely.\n"
507                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
508           stg_exit(1);
509     }
510
511     scheduleHandleInterrupt();
512
513 #if defined(not_yet) && defined(SMP)
514     //
515     // Top up the run queue from our spark pool.  We try to make the
516     // number of threads in the run queue equal to the number of
517     // free capabilities.
518     //
519     {
520         StgClosure *spark;
521         if (EMPTY_RUN_QUEUE()) {
522             spark = findSpark(rtsFalse);
523             if (spark == NULL) {
524                 break; /* no more sparks in the pool */
525             } else {
526                 createSparkThread(spark);         
527                 IF_DEBUG(scheduler,
528                          sched_belch("==^^ turning spark of closure %p into a thread",
529                                      (StgClosure *)spark));
530             }
531         }
532     }
533 #endif // SMP
534
535     scheduleStartSignalHandlers();
536
537     // Only check the black holes here if we've nothing else to do.
538     // During normal execution, the black hole list only gets checked
539     // at GC time, to avoid repeatedly traversing this possibly long
540     // list each time around the scheduler.
541     if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); }
542
543     scheduleCheckBlockedThreads();
544
545     scheduleDetectDeadlock();
546
547     // Normally, the only way we can get here with no threads to
548     // run is if a keyboard interrupt received during 
549     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
550     // Additionally, it is not fatal for the
551     // threaded RTS to reach here with no threads to run.
552     //
553     // win32: might be here due to awaitEvent() being abandoned
554     // as a result of a console event having been delivered.
555     if ( EMPTY_RUN_QUEUE() ) {
556 #if !defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_HOST_OS)
557         ASSERT(interrupted);
558 #endif
559         continue; // nothing to do
560     }
561
562 #if defined(PARALLEL_HASKELL)
563     scheduleSendPendingMessages();
564     if (EMPTY_RUN_QUEUE() && scheduleActivateSpark()) 
565         continue;
566
567 #if defined(SPARKS)
568     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
569 #endif
570
571     /* If we still have no work we need to send a FISH to get a spark
572        from another PE */
573     if (EMPTY_RUN_QUEUE()) {
574         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
575         ASSERT(rtsFalse); // should not happen at the moment
576     }
577     // from here: non-empty run queue.
578     //  TODO: merge above case with this, only one call processMessages() !
579     if (PacketsWaiting()) {  /* process incoming messages, if
580                                 any pending...  only in else
581                                 because getRemoteWork waits for
582                                 messages as well */
583         receivedFinish = processMessages();
584     }
585 #endif
586
587 #if defined(GRAN)
588     scheduleProcessEvent(event);
589 #endif
590
591     // 
592     // Get a thread to run
593     //
594     ASSERT(run_queue_hd != END_TSO_QUEUE);
595     POP_RUN_QUEUE(t);
596
597 #if defined(GRAN) || defined(PAR)
598     scheduleGranParReport(); // some kind of debuging output
599 #else
600     // Sanity check the thread we're about to run.  This can be
601     // expensive if there is lots of thread switching going on...
602     IF_DEBUG(sanity,checkTSO(t));
603 #endif
604
605 #if defined(RTS_SUPPORTS_THREADS)
606     // Check whether we can run this thread in the current task.
607     // If not, we have to pass our capability to the right task.
608     {
609       StgMainThread *m = t->main;
610       
611       if(m)
612       {
613         if(m == mainThread)
614         {
615           IF_DEBUG(scheduler,
616             sched_belch("### Running thread %d in bound thread", t->id));
617           // yes, the Haskell thread is bound to the current native thread
618         }
619         else
620         {
621           IF_DEBUG(scheduler,
622             sched_belch("### thread %d bound to another OS thread", t->id));
623           // no, bound to a different Haskell thread: pass to that thread
624           PUSH_ON_RUN_QUEUE(t);
625           passCapability(&m->bound_thread_cond);
626           continue;
627         }
628       }
629       else
630       {
631         if(mainThread != NULL)
632         // The thread we want to run is bound.
633         {
634           IF_DEBUG(scheduler,
635             sched_belch("### this OS thread cannot run thread %d", t->id));
636           // no, the current native thread is bound to a different
637           // Haskell thread, so pass it to any worker thread
638           PUSH_ON_RUN_QUEUE(t);
639           passCapabilityToWorker();
640           continue; 
641         }
642       }
643     }
644 #endif
645
646     cap->r.rCurrentTSO = t;
647     
648     /* context switches are now initiated by the timer signal, unless
649      * the user specified "context switch as often as possible", with
650      * +RTS -C0
651      */
652     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
653          && (run_queue_hd != END_TSO_QUEUE
654              || blocked_queue_hd != END_TSO_QUEUE
655              || sleeping_queue != END_TSO_QUEUE)))
656         context_switch = 1;
657
658 run_thread:
659
660     RELEASE_LOCK(&sched_mutex);
661
662     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
663                               (long)t->id, whatNext_strs[t->what_next]));
664
665 #if defined(PROFILING)
666     startHeapProfTimer();
667 #endif
668
669     // ----------------------------------------------------------------------
670     // Run the current thread 
671
672     prev_what_next = t->what_next;
673
674     errno = t->saved_errno;
675     in_haskell = rtsTrue;
676
677     switch (prev_what_next) {
678
679     case ThreadKilled:
680     case ThreadComplete:
681         /* Thread already finished, return to scheduler. */
682         ret = ThreadFinished;
683         break;
684
685     case ThreadRunGHC:
686         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
687         break;
688
689     case ThreadInterpret:
690         ret = interpretBCO(cap);
691         break;
692
693     default:
694       barf("schedule: invalid what_next field");
695     }
696
697     // We have run some Haskell code: there might be blackhole-blocked
698     // threads to wake up now.
699     if ( blackhole_queue != END_TSO_QUEUE ) {
700         blackholes_need_checking = rtsTrue;
701     }
702
703     in_haskell = rtsFalse;
704
705     // The TSO might have moved, eg. if it re-entered the RTS and a GC
706     // happened.  So find the new location:
707     t = cap->r.rCurrentTSO;
708
709     // And save the current errno in this thread.
710     t->saved_errno = errno;
711
712     // ----------------------------------------------------------------------
713     
714     /* Costs for the scheduler are assigned to CCS_SYSTEM */
715 #if defined(PROFILING)
716     stopHeapProfTimer();
717     CCCS = CCS_SYSTEM;
718 #endif
719     
720     ACQUIRE_LOCK(&sched_mutex);
721     
722 #if defined(RTS_SUPPORTS_THREADS)
723     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
724 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
725     IF_DEBUG(scheduler,debugBelch("sched: "););
726 #endif
727     
728     schedulePostRunThread();
729
730     switch (ret) {
731     case HeapOverflow:
732         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
733         break;
734
735     case StackOverflow:
736         scheduleHandleStackOverflow(t);
737         break;
738
739     case ThreadYielding:
740         if (scheduleHandleYield(t, prev_what_next)) {
741             // shortcut for switching between compiler/interpreter:
742             goto run_thread; 
743         }
744         break;
745
746     case ThreadBlocked:
747         scheduleHandleThreadBlocked(t);
748         threadPaused(t);
749         break;
750
751     case ThreadFinished:
752         if (scheduleHandleThreadFinished(mainThread, cap, t)) return;;
753         break;
754
755     default:
756       barf("schedule: invalid thread return code %d", (int)ret);
757     }
758
759     scheduleDoHeapProfile();
760     scheduleDoGC();
761   } /* end of while() */
762
763   IF_PAR_DEBUG(verbose,
764                debugBelch("== Leaving schedule() after having received Finish\n"));
765 }
766
767 /* ----------------------------------------------------------------------------
768  * Setting up the scheduler loop
769  * ASSUMES: sched_mutex
770  * ------------------------------------------------------------------------- */
771
772 static void
773 schedulePreLoop(void)
774 {
775 #if defined(GRAN) 
776     /* set up first event to get things going */
777     /* ToDo: assign costs for system setup and init MainTSO ! */
778     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
779               ContinueThread, 
780               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
781     
782     IF_DEBUG(gran,
783              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
784                         CurrentTSO);
785              G_TSO(CurrentTSO, 5));
786     
787     if (RtsFlags.GranFlags.Light) {
788         /* Save current time; GranSim Light only */
789         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
790     }      
791 #endif
792 }
793
794 /* ----------------------------------------------------------------------------
795  * Deal with the interrupt flag
796  * ASSUMES: sched_mutex
797  * ------------------------------------------------------------------------- */
798
799 static
800 void scheduleHandleInterrupt(void)
801 {
802     //
803     // Test for interruption.  If interrupted==rtsTrue, then either
804     // we received a keyboard interrupt (^C), or the scheduler is
805     // trying to shut down all the tasks (shutting_down_scheduler) in
806     // the threaded RTS.
807     //
808     if (interrupted) {
809         if (shutting_down_scheduler) {
810             IF_DEBUG(scheduler, sched_belch("shutting down"));
811 #if defined(RTS_SUPPORTS_THREADS)
812             shutdownThread();
813 #endif
814         } else {
815             IF_DEBUG(scheduler, sched_belch("interrupted"));
816             deleteAllThreads();
817         }
818     }
819 }
820
821 /* ----------------------------------------------------------------------------
822  * Start any pending signal handlers
823  * ASSUMES: sched_mutex
824  * ------------------------------------------------------------------------- */
825
826 static void
827 scheduleStartSignalHandlers(void)
828 {
829 #if defined(RTS_USER_SIGNALS)
830     if (signals_pending()) {
831       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
832       startSignalHandlers();
833       ACQUIRE_LOCK(&sched_mutex);
834     }
835 #endif
836 }
837
838 /* ----------------------------------------------------------------------------
839  * Check for blocked threads that can be woken up.
840  * ASSUMES: sched_mutex
841  * ------------------------------------------------------------------------- */
842
843 static void
844 scheduleCheckBlockedThreads(void)
845 {
846     //
847     // Check whether any waiting threads need to be woken up.  If the
848     // run queue is empty, and there are no other tasks running, we
849     // can wait indefinitely for something to happen.
850     //
851     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
852     {
853 #if defined(RTS_SUPPORTS_THREADS)
854         // We shouldn't be here...
855         barf("schedule: awaitEvent() in threaded RTS");
856 #endif
857         awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
858     }
859 }
860
861
862 /* ----------------------------------------------------------------------------
863  * Check for threads blocked on BLACKHOLEs that can be woken up
864  * ASSUMES: sched_mutex
865  * ------------------------------------------------------------------------- */
866 static void
867 scheduleCheckBlackHoles( void )
868 {
869     if ( blackholes_need_checking )
870     {
871         checkBlackHoles();
872         blackholes_need_checking = rtsFalse;
873     }
874 }
875
876 /* ----------------------------------------------------------------------------
877  * Detect deadlock conditions and attempt to resolve them.
878  * ASSUMES: sched_mutex
879  * ------------------------------------------------------------------------- */
880
881 static void
882 scheduleDetectDeadlock(void)
883 {
884     /* 
885      * Detect deadlock: when we have no threads to run, there are no
886      * threads blocked, waiting for I/O, or sleeping, and all the
887      * other tasks are waiting for work, we must have a deadlock of
888      * some description.
889      */
890     if ( EMPTY_THREAD_QUEUES() )
891     {
892 #if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS)
893         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
894
895         // Garbage collection can release some new threads due to
896         // either (a) finalizers or (b) threads resurrected because
897         // they are unreachable and will therefore be sent an
898         // exception.  Any threads thus released will be immediately
899         // runnable.
900         GarbageCollect(GetRoots,rtsTrue);
901         if ( !EMPTY_RUN_QUEUE() ) return;
902
903 #if defined(RTS_USER_SIGNALS)
904         /* If we have user-installed signal handlers, then wait
905          * for signals to arrive rather then bombing out with a
906          * deadlock.
907          */
908         if ( anyUserHandlers() ) {
909             IF_DEBUG(scheduler, 
910                      sched_belch("still deadlocked, waiting for signals..."));
911
912             awaitUserSignals();
913
914             if (signals_pending()) {
915                 RELEASE_LOCK(&sched_mutex);
916                 startSignalHandlers();
917                 ACQUIRE_LOCK(&sched_mutex);
918             }
919
920             // either we have threads to run, or we were interrupted:
921             ASSERT(!EMPTY_RUN_QUEUE() || interrupted);
922         }
923 #endif
924
925         /* Probably a real deadlock.  Send the current main thread the
926          * Deadlock exception (or in the SMP build, send *all* main
927          * threads the deadlock exception, since none of them can make
928          * progress).
929          */
930         {
931             StgMainThread *m;
932             m = main_threads;
933             switch (m->tso->why_blocked) {
934             case BlockedOnBlackHole:
935             case BlockedOnException:
936             case BlockedOnMVar:
937                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
938                 return;
939             default:
940                 barf("deadlock: main thread blocked in a strange way");
941             }
942         }
943
944 #elif defined(RTS_SUPPORTS_THREADS)
945     // ToDo: add deadlock detection in threaded RTS
946 #elif defined(PARALLEL_HASKELL)
947     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
948 #endif
949     }
950 }
951
952 /* ----------------------------------------------------------------------------
953  * Process an event (GRAN only)
954  * ------------------------------------------------------------------------- */
955
956 #if defined(GRAN)
957 static StgTSO *
958 scheduleProcessEvent(rtsEvent *event)
959 {
960     StgTSO *t;
961
962     if (RtsFlags.GranFlags.Light)
963       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
964
965     /* adjust time based on time-stamp */
966     if (event->time > CurrentTime[CurrentProc] &&
967         event->evttype != ContinueThread)
968       CurrentTime[CurrentProc] = event->time;
969     
970     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
971     if (!RtsFlags.GranFlags.Light)
972       handleIdlePEs();
973
974     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
975
976     /* main event dispatcher in GranSim */
977     switch (event->evttype) {
978       /* Should just be continuing execution */
979     case ContinueThread:
980       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
981       /* ToDo: check assertion
982       ASSERT(run_queue_hd != (StgTSO*)NULL &&
983              run_queue_hd != END_TSO_QUEUE);
984       */
985       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
986       if (!RtsFlags.GranFlags.DoAsyncFetch &&
987           procStatus[CurrentProc]==Fetching) {
988         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
989               CurrentTSO->id, CurrentTSO, CurrentProc);
990         goto next_thread;
991       } 
992       /* Ignore ContinueThreads for completed threads */
993       if (CurrentTSO->what_next == ThreadComplete) {
994         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
995               CurrentTSO->id, CurrentTSO, CurrentProc);
996         goto next_thread;
997       } 
998       /* Ignore ContinueThreads for threads that are being migrated */
999       if (PROCS(CurrentTSO)==Nowhere) { 
1000         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1001               CurrentTSO->id, CurrentTSO, CurrentProc);
1002         goto next_thread;
1003       }
1004       /* The thread should be at the beginning of the run queue */
1005       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1006         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1007               CurrentTSO->id, CurrentTSO, CurrentProc);
1008         break; // run the thread anyway
1009       }
1010       /*
1011       new_event(proc, proc, CurrentTime[proc],
1012                 FindWork,
1013                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1014       goto next_thread; 
1015       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1016       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1017
1018     case FetchNode:
1019       do_the_fetchnode(event);
1020       goto next_thread;             /* handle next event in event queue  */
1021       
1022     case GlobalBlock:
1023       do_the_globalblock(event);
1024       goto next_thread;             /* handle next event in event queue  */
1025       
1026     case FetchReply:
1027       do_the_fetchreply(event);
1028       goto next_thread;             /* handle next event in event queue  */
1029       
1030     case UnblockThread:   /* Move from the blocked queue to the tail of */
1031       do_the_unblock(event);
1032       goto next_thread;             /* handle next event in event queue  */
1033       
1034     case ResumeThread:  /* Move from the blocked queue to the tail of */
1035       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1036       event->tso->gran.blocktime += 
1037         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1038       do_the_startthread(event);
1039       goto next_thread;             /* handle next event in event queue  */
1040       
1041     case StartThread:
1042       do_the_startthread(event);
1043       goto next_thread;             /* handle next event in event queue  */
1044       
1045     case MoveThread:
1046       do_the_movethread(event);
1047       goto next_thread;             /* handle next event in event queue  */
1048       
1049     case MoveSpark:
1050       do_the_movespark(event);
1051       goto next_thread;             /* handle next event in event queue  */
1052       
1053     case FindWork:
1054       do_the_findwork(event);
1055       goto next_thread;             /* handle next event in event queue  */
1056       
1057     default:
1058       barf("Illegal event type %u\n", event->evttype);
1059     }  /* switch */
1060     
1061     /* This point was scheduler_loop in the old RTS */
1062
1063     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1064
1065     TimeOfLastEvent = CurrentTime[CurrentProc];
1066     TimeOfNextEvent = get_time_of_next_event();
1067     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1068     // CurrentTSO = ThreadQueueHd;
1069
1070     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1071                          TimeOfNextEvent));
1072
1073     if (RtsFlags.GranFlags.Light) 
1074       GranSimLight_leave_system(event, &ActiveTSO); 
1075
1076     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1077
1078     IF_DEBUG(gran, 
1079              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1080
1081     /* in a GranSim setup the TSO stays on the run queue */
1082     t = CurrentTSO;
1083     /* Take a thread from the run queue. */
1084     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1085
1086     IF_DEBUG(gran, 
1087              debugBelch("GRAN: About to run current thread, which is\n");
1088              G_TSO(t,5));
1089
1090     context_switch = 0; // turned on via GranYield, checking events and time slice
1091
1092     IF_DEBUG(gran, 
1093              DumpGranEvent(GR_SCHEDULE, t));
1094
1095     procStatus[CurrentProc] = Busy;
1096 }
1097 #endif // GRAN
1098
1099 /* ----------------------------------------------------------------------------
1100  * Send pending messages (PARALLEL_HASKELL only)
1101  * ------------------------------------------------------------------------- */
1102
1103 #if defined(PARALLEL_HASKELL)
1104 static StgTSO *
1105 scheduleSendPendingMessages(void)
1106 {
1107     StgSparkPool *pool;
1108     rtsSpark spark;
1109     StgTSO *t;
1110
1111 # if defined(PAR) // global Mem.Mgmt., omit for now
1112     if (PendingFetches != END_BF_QUEUE) {
1113         processFetches();
1114     }
1115 # endif
1116     
1117     if (RtsFlags.ParFlags.BufferTime) {
1118         // if we use message buffering, we must send away all message
1119         // packets which have become too old...
1120         sendOldBuffers(); 
1121     }
1122 }
1123 #endif
1124
1125 /* ----------------------------------------------------------------------------
1126  * Activate spark threads (PARALLEL_HASKELL only)
1127  * ------------------------------------------------------------------------- */
1128
1129 #if defined(PARALLEL_HASKELL)
1130 static void
1131 scheduleActivateSpark(void)
1132 {
1133 #if defined(SPARKS)
1134   ASSERT(EMPTY_RUN_QUEUE());
1135 /* We get here if the run queue is empty and want some work.
1136    We try to turn a spark into a thread, and add it to the run queue,
1137    from where it will be picked up in the next iteration of the scheduler
1138    loop.
1139 */
1140
1141       /* :-[  no local threads => look out for local sparks */
1142       /* the spark pool for the current PE */
1143       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1144       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1145           pool->hd < pool->tl) {
1146         /* 
1147          * ToDo: add GC code check that we really have enough heap afterwards!!
1148          * Old comment:
1149          * If we're here (no runnable threads) and we have pending
1150          * sparks, we must have a space problem.  Get enough space
1151          * to turn one of those pending sparks into a
1152          * thread... 
1153          */
1154
1155         spark = findSpark(rtsFalse);            /* get a spark */
1156         if (spark != (rtsSpark) NULL) {
1157           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1158           IF_PAR_DEBUG(fish, // schedule,
1159                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1160                              tso->id, tso, advisory_thread_count));
1161
1162           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1163             IF_PAR_DEBUG(fish, // schedule,
1164                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1165                             spark));
1166             return rtsFalse; /* failed to generate a thread */
1167           }                  /* otherwise fall through & pick-up new tso */
1168         } else {
1169           IF_PAR_DEBUG(fish, // schedule,
1170                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1171                              spark_queue_len(pool)));
1172           return rtsFalse;  /* failed to generate a thread */
1173         }
1174         return rtsTrue;  /* success in generating a thread */
1175   } else { /* no more threads permitted or pool empty */
1176     return rtsFalse;  /* failed to generateThread */
1177   }
1178 #else
1179   tso = NULL; // avoid compiler warning only
1180   return rtsFalse;  /* dummy in non-PAR setup */
1181 #endif // SPARKS
1182 }
1183 #endif // PARALLEL_HASKELL
1184
1185 /* ----------------------------------------------------------------------------
1186  * Get work from a remote node (PARALLEL_HASKELL only)
1187  * ------------------------------------------------------------------------- */
1188     
1189 #if defined(PARALLEL_HASKELL)
1190 static rtsBool
1191 scheduleGetRemoteWork(rtsBool *receivedFinish)
1192 {
1193   ASSERT(EMPTY_RUN_QUEUE());
1194
1195   if (RtsFlags.ParFlags.BufferTime) {
1196         IF_PAR_DEBUG(verbose, 
1197                 debugBelch("...send all pending data,"));
1198         {
1199           nat i;
1200           for (i=1; i<=nPEs; i++)
1201             sendImmediately(i); // send all messages away immediately
1202         }
1203   }
1204 # ifndef SPARKS
1205         //++EDEN++ idle() , i.e. send all buffers, wait for work
1206         // suppress fishing in EDEN... just look for incoming messages
1207         // (blocking receive)
1208   IF_PAR_DEBUG(verbose, 
1209                debugBelch("...wait for incoming messages...\n"));
1210   *receivedFinish = processMessages(); // blocking receive...
1211
1212         // and reenter scheduling loop after having received something
1213         // (return rtsFalse below)
1214
1215 # else /* activate SPARKS machinery */
1216 /* We get here, if we have no work, tried to activate a local spark, but still
1217    have no work. We try to get a remote spark, by sending a FISH message.
1218    Thread migration should be added here, and triggered when a sequence of 
1219    fishes returns without work. */
1220         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1221
1222       /* =8-[  no local sparks => look for work on other PEs */
1223         /*
1224          * We really have absolutely no work.  Send out a fish
1225          * (there may be some out there already), and wait for
1226          * something to arrive.  We clearly can't run any threads
1227          * until a SCHEDULE or RESUME arrives, and so that's what
1228          * we're hoping to see.  (Of course, we still have to
1229          * respond to other types of messages.)
1230          */
1231         rtsTime now = msTime() /*CURRENT_TIME*/;
1232         IF_PAR_DEBUG(verbose, 
1233                      debugBelch("--  now=%ld\n", now));
1234         IF_PAR_DEBUG(fish, // verbose,
1235              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1236                  (last_fish_arrived_at!=0 &&
1237                   last_fish_arrived_at+delay > now)) {
1238                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1239                      now, last_fish_arrived_at+delay, 
1240                      last_fish_arrived_at,
1241                      delay);
1242              });
1243   
1244         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1245             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1246           if (last_fish_arrived_at==0 ||
1247               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1248             /* outstandingFishes is set in sendFish, processFish;
1249                avoid flooding system with fishes via delay */
1250     next_fish_to_send_at = 0;  
1251   } else {
1252     /* ToDo: this should be done in the main scheduling loop to avoid the
1253              busy wait here; not so bad if fish delay is very small  */
1254     int iq = 0; // DEBUGGING -- HWL
1255     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1256     /* send a fish when ready, but process messages that arrive in the meantime */
1257     do {
1258       if (PacketsWaiting()) {
1259         iq++; // DEBUGGING
1260         *receivedFinish = processMessages();
1261       }
1262       now = msTime();
1263     } while (!*receivedFinish || now<next_fish_to_send_at);
1264     // JB: This means the fish could become obsolete, if we receive
1265     // work. Better check for work again? 
1266     // last line: while (!receivedFinish || !haveWork || now<...)
1267     // next line: if (receivedFinish || haveWork )
1268
1269     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1270       return rtsFalse;  // NB: this will leave scheduler loop
1271                         // immediately after return!
1272                           
1273     IF_PAR_DEBUG(fish, // verbose,
1274                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1275
1276   }
1277
1278     // JB: IMHO, this should all be hidden inside sendFish(...)
1279     /* pe = choosePE(); 
1280        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1281                 NEW_FISH_HUNGER);
1282
1283     // Global statistics: count no. of fishes
1284     if (RtsFlags.ParFlags.ParStats.Global &&
1285          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1286            globalParStats.tot_fish_mess++;
1287            }
1288     */ 
1289
1290   /* delayed fishes must have been sent by now! */
1291   next_fish_to_send_at = 0;  
1292   }
1293       
1294   *receivedFinish = processMessages();
1295 # endif /* SPARKS */
1296
1297  return rtsFalse;
1298  /* NB: this function always returns rtsFalse, meaning the scheduler
1299     loop continues with the next iteration; 
1300     rationale: 
1301       return code means success in finding work; we enter this function
1302       if there is no local work, thus have to send a fish which takes
1303       time until it arrives with work; in the meantime we should process
1304       messages in the main loop;
1305  */
1306 }
1307 #endif // PARALLEL_HASKELL
1308
1309 /* ----------------------------------------------------------------------------
1310  * PAR/GRAN: Report stats & debugging info(?)
1311  * ------------------------------------------------------------------------- */
1312
1313 #if defined(PAR) || defined(GRAN)
1314 static void
1315 scheduleGranParReport(void)
1316 {
1317   ASSERT(run_queue_hd != END_TSO_QUEUE);
1318
1319   /* Take a thread from the run queue, if we have work */
1320   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1321
1322     /* If this TSO has got its outport closed in the meantime, 
1323      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1324      * It has to be marked as TH_DEAD for this purpose.
1325      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1326
1327 JB: TODO: investigate wether state change field could be nuked
1328      entirely and replaced by the normal tso state (whatnext
1329      field). All we want to do is to kill tsos from outside.
1330      */
1331
1332     /* ToDo: write something to the log-file
1333     if (RTSflags.ParFlags.granSimStats && !sameThread)
1334         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1335
1336     CurrentTSO = t;
1337     */
1338     /* the spark pool for the current PE */
1339     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1340
1341     IF_DEBUG(scheduler, 
1342              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1343                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1344
1345     IF_PAR_DEBUG(fish,
1346              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1347                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1348
1349     if (RtsFlags.ParFlags.ParStats.Full && 
1350         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1351         (emitSchedule || // forced emit
1352          (t && LastTSO && t->id != LastTSO->id))) {
1353       /* 
1354          we are running a different TSO, so write a schedule event to log file
1355          NB: If we use fair scheduling we also have to write  a deschedule 
1356              event for LastTSO; with unfair scheduling we know that the
1357              previous tso has blocked whenever we switch to another tso, so
1358              we don't need it in GUM for now
1359       */
1360       IF_PAR_DEBUG(fish, // schedule,
1361                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1362
1363       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1364                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1365       emitSchedule = rtsFalse;
1366     }
1367 }     
1368 #endif
1369
1370 /* ----------------------------------------------------------------------------
1371  * After running a thread...
1372  * ASSUMES: sched_mutex
1373  * ------------------------------------------------------------------------- */
1374
1375 static void
1376 schedulePostRunThread(void)
1377 {
1378 #if defined(PAR)
1379     /* HACK 675: if the last thread didn't yield, make sure to print a 
1380        SCHEDULE event to the log file when StgRunning the next thread, even
1381        if it is the same one as before */
1382     LastTSO = t; 
1383     TimeOfLastYield = CURRENT_TIME;
1384 #endif
1385
1386   /* some statistics gathering in the parallel case */
1387
1388 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1389   switch (ret) {
1390     case HeapOverflow:
1391 # if defined(GRAN)
1392       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1393       globalGranStats.tot_heapover++;
1394 # elif defined(PAR)
1395       globalParStats.tot_heapover++;
1396 # endif
1397       break;
1398
1399      case StackOverflow:
1400 # if defined(GRAN)
1401       IF_DEBUG(gran, 
1402                DumpGranEvent(GR_DESCHEDULE, t));
1403       globalGranStats.tot_stackover++;
1404 # elif defined(PAR)
1405       // IF_DEBUG(par, 
1406       // DumpGranEvent(GR_DESCHEDULE, t);
1407       globalParStats.tot_stackover++;
1408 # endif
1409       break;
1410
1411     case ThreadYielding:
1412 # if defined(GRAN)
1413       IF_DEBUG(gran, 
1414                DumpGranEvent(GR_DESCHEDULE, t));
1415       globalGranStats.tot_yields++;
1416 # elif defined(PAR)
1417       // IF_DEBUG(par, 
1418       // DumpGranEvent(GR_DESCHEDULE, t);
1419       globalParStats.tot_yields++;
1420 # endif
1421       break; 
1422
1423     case ThreadBlocked:
1424 # if defined(GRAN)
1425       IF_DEBUG(scheduler,
1426                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1427                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1428                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1429                if (t->block_info.closure!=(StgClosure*)NULL)
1430                  print_bq(t->block_info.closure);
1431                debugBelch("\n"));
1432
1433       // ??? needed; should emit block before
1434       IF_DEBUG(gran, 
1435                DumpGranEvent(GR_DESCHEDULE, t)); 
1436       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1437       /*
1438         ngoq Dogh!
1439       ASSERT(procStatus[CurrentProc]==Busy || 
1440               ((procStatus[CurrentProc]==Fetching) && 
1441               (t->block_info.closure!=(StgClosure*)NULL)));
1442       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1443           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1444             procStatus[CurrentProc]==Fetching)) 
1445         procStatus[CurrentProc] = Idle;
1446       */
1447 # elif defined(PAR)
1448 //++PAR++  blockThread() writes the event (change?)
1449 # endif
1450     break;
1451
1452   case ThreadFinished:
1453     break;
1454
1455   default:
1456     barf("parGlobalStats: unknown return code");
1457     break;
1458     }
1459 #endif
1460 }
1461
1462 /* -----------------------------------------------------------------------------
1463  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1464  * ASSUMES: sched_mutex
1465  * -------------------------------------------------------------------------- */
1466
1467 static rtsBool
1468 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1469 {
1470     // did the task ask for a large block?
1471     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1472         // if so, get one and push it on the front of the nursery.
1473         bdescr *bd;
1474         lnat blocks;
1475         
1476         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1477         
1478         IF_DEBUG(scheduler,
1479                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", 
1480                             (long)t->id, whatNext_strs[t->what_next], blocks));
1481         
1482         // don't do this if it would push us over the
1483         // alloc_blocks_lim limit; we'll GC first.
1484         if (alloc_blocks + blocks < alloc_blocks_lim) {
1485             
1486             alloc_blocks += blocks;
1487             bd = allocGroup( blocks );
1488             
1489             // link the new group into the list
1490             bd->link = cap->r.rCurrentNursery;
1491             bd->u.back = cap->r.rCurrentNursery->u.back;
1492             if (cap->r.rCurrentNursery->u.back != NULL) {
1493                 cap->r.rCurrentNursery->u.back->link = bd;
1494             } else {
1495                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1496                        g0s0->blocks == cap->r.rNursery);
1497                 cap->r.rNursery = g0s0->blocks = bd;
1498             }             
1499             cap->r.rCurrentNursery->u.back = bd;
1500             
1501             // initialise it as a nursery block.  We initialise the
1502             // step, gen_no, and flags field of *every* sub-block in
1503             // this large block, because this is easier than making
1504             // sure that we always find the block head of a large
1505             // block whenever we call Bdescr() (eg. evacuate() and
1506             // isAlive() in the GC would both have to do this, at
1507             // least).
1508             { 
1509                 bdescr *x;
1510                 for (x = bd; x < bd + blocks; x++) {
1511                     x->step = g0s0;
1512                     x->gen_no = 0;
1513                     x->flags = 0;
1514                 }
1515             }
1516             
1517             // don't forget to update the block count in g0s0.
1518             g0s0->n_blocks += blocks;
1519             // This assert can be a killer if the app is doing lots
1520             // of large block allocations.
1521             ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1522             
1523             // now update the nursery to point to the new block
1524             cap->r.rCurrentNursery = bd;
1525             
1526             // we might be unlucky and have another thread get on the
1527             // run queue before us and steal the large block, but in that
1528             // case the thread will just end up requesting another large
1529             // block.
1530             PUSH_ON_RUN_QUEUE(t);
1531             return rtsFalse;  /* not actually GC'ing */
1532         }
1533     }
1534     
1535     /* make all the running tasks block on a condition variable,
1536      * maybe set context_switch and wait till they all pile in,
1537      * then have them wait on a GC condition variable.
1538      */
1539     IF_DEBUG(scheduler,
1540              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1541                         (long)t->id, whatNext_strs[t->what_next]));
1542     threadPaused(t);
1543 #if defined(GRAN)
1544     ASSERT(!is_on_queue(t,CurrentProc));
1545 #elif defined(PARALLEL_HASKELL)
1546     /* Currently we emit a DESCHEDULE event before GC in GUM.
1547        ToDo: either add separate event to distinguish SYSTEM time from rest
1548        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1549     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1550         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1551                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1552         emitSchedule = rtsTrue;
1553     }
1554 #endif
1555       
1556     PUSH_ON_RUN_QUEUE(t);
1557     return rtsTrue;
1558     /* actual GC is done at the end of the while loop in schedule() */
1559 }
1560
1561 /* -----------------------------------------------------------------------------
1562  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1563  * ASSUMES: sched_mutex
1564  * -------------------------------------------------------------------------- */
1565
1566 static void
1567 scheduleHandleStackOverflow( StgTSO *t)
1568 {
1569     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1570                                   (long)t->id, whatNext_strs[t->what_next]));
1571     /* just adjust the stack for this thread, then pop it back
1572      * on the run queue.
1573      */
1574     threadPaused(t);
1575     { 
1576         /* enlarge the stack */
1577         StgTSO *new_t = threadStackOverflow(t);
1578         
1579         /* This TSO has moved, so update any pointers to it from the
1580          * main thread stack.  It better not be on any other queues...
1581          * (it shouldn't be).
1582          */
1583         if (t->main != NULL) {
1584             t->main->tso = new_t;
1585         }
1586         PUSH_ON_RUN_QUEUE(new_t);
1587     }
1588 }
1589
1590 /* -----------------------------------------------------------------------------
1591  * Handle a thread that returned to the scheduler with ThreadYielding
1592  * ASSUMES: sched_mutex
1593  * -------------------------------------------------------------------------- */
1594
1595 static rtsBool
1596 scheduleHandleYield( StgTSO *t, nat prev_what_next )
1597 {
1598     // Reset the context switch flag.  We don't do this just before
1599     // running the thread, because that would mean we would lose ticks
1600     // during GC, which can lead to unfair scheduling (a thread hogs
1601     // the CPU because the tick always arrives during GC).  This way
1602     // penalises threads that do a lot of allocation, but that seems
1603     // better than the alternative.
1604     context_switch = 0;
1605     
1606     /* put the thread back on the run queue.  Then, if we're ready to
1607      * GC, check whether this is the last task to stop.  If so, wake
1608      * up the GC thread.  getThread will block during a GC until the
1609      * GC is finished.
1610      */
1611     IF_DEBUG(scheduler,
1612              if (t->what_next != prev_what_next) {
1613                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1614                             (long)t->id, whatNext_strs[t->what_next]);
1615              } else {
1616                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1617                             (long)t->id, whatNext_strs[t->what_next]);
1618              }
1619         );
1620     
1621     IF_DEBUG(sanity,
1622              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1623              checkTSO(t));
1624     ASSERT(t->link == END_TSO_QUEUE);
1625     
1626     // Shortcut if we're just switching evaluators: don't bother
1627     // doing stack squeezing (which can be expensive), just run the
1628     // thread.
1629     if (t->what_next != prev_what_next) {
1630         return rtsTrue;
1631     }
1632     
1633     threadPaused(t);
1634     
1635 #if defined(GRAN)
1636     ASSERT(!is_on_queue(t,CurrentProc));
1637       
1638     IF_DEBUG(sanity,
1639              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1640              checkThreadQsSanity(rtsTrue));
1641
1642 #endif
1643
1644     addToRunQueue(t);
1645
1646 #if defined(GRAN)
1647     /* add a ContinueThread event to actually process the thread */
1648     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1649               ContinueThread,
1650               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1651     IF_GRAN_DEBUG(bq, 
1652                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1653                   G_EVENTQ(0);
1654                   G_CURR_THREADQ(0));
1655 #endif
1656     return rtsFalse;
1657 }
1658
1659 /* -----------------------------------------------------------------------------
1660  * Handle a thread that returned to the scheduler with ThreadBlocked
1661  * ASSUMES: sched_mutex
1662  * -------------------------------------------------------------------------- */
1663
1664 static void
1665 scheduleHandleThreadBlocked( StgTSO *t
1666 #if !defined(GRAN) && !defined(DEBUG)
1667     STG_UNUSED
1668 #endif
1669     )
1670 {
1671 #if defined(GRAN)
1672     IF_DEBUG(scheduler,
1673              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1674                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1675              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1676     
1677     // ??? needed; should emit block before
1678     IF_DEBUG(gran, 
1679              DumpGranEvent(GR_DESCHEDULE, t)); 
1680     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1681     /*
1682       ngoq Dogh!
1683       ASSERT(procStatus[CurrentProc]==Busy || 
1684       ((procStatus[CurrentProc]==Fetching) && 
1685       (t->block_info.closure!=(StgClosure*)NULL)));
1686       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1687       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1688       procStatus[CurrentProc]==Fetching)) 
1689       procStatus[CurrentProc] = Idle;
1690     */
1691 #elif defined(PAR)
1692     IF_DEBUG(scheduler,
1693              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1694                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1695     IF_PAR_DEBUG(bq,
1696                  
1697                  if (t->block_info.closure!=(StgClosure*)NULL) 
1698                  print_bq(t->block_info.closure));
1699     
1700     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1701     blockThread(t);
1702     
1703     /* whatever we schedule next, we must log that schedule */
1704     emitSchedule = rtsTrue;
1705     
1706 #else /* !GRAN */
1707       /* don't need to do anything.  Either the thread is blocked on
1708        * I/O, in which case we'll have called addToBlockedQueue
1709        * previously, or it's blocked on an MVar or Blackhole, in which
1710        * case it'll be on the relevant queue already.
1711        */
1712     ASSERT(t->why_blocked != NotBlocked);
1713     IF_DEBUG(scheduler,
1714              debugBelch("--<< thread %d (%s) stopped: ", 
1715                         t->id, whatNext_strs[t->what_next]);
1716              printThreadBlockage(t);
1717              debugBelch("\n"));
1718     
1719     /* Only for dumping event to log file 
1720        ToDo: do I need this in GranSim, too?
1721        blockThread(t);
1722     */
1723 #endif
1724 }
1725
1726 /* -----------------------------------------------------------------------------
1727  * Handle a thread that returned to the scheduler with ThreadFinished
1728  * ASSUMES: sched_mutex
1729  * -------------------------------------------------------------------------- */
1730
1731 static rtsBool
1732 scheduleHandleThreadFinished( StgMainThread *mainThread
1733                               USED_WHEN_RTS_SUPPORTS_THREADS,
1734                               Capability *cap,
1735                               StgTSO *t )
1736 {
1737     /* Need to check whether this was a main thread, and if so,
1738      * return with the return value.
1739      *
1740      * We also end up here if the thread kills itself with an
1741      * uncaught exception, see Exception.cmm.
1742      */
1743     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1744                                   t->id, whatNext_strs[t->what_next]));
1745
1746 #if defined(GRAN)
1747       endThread(t, CurrentProc); // clean-up the thread
1748 #elif defined(PARALLEL_HASKELL)
1749       /* For now all are advisory -- HWL */
1750       //if(t->priority==AdvisoryPriority) ??
1751       advisory_thread_count--; // JB: Caution with this counter, buggy!
1752       
1753 # if defined(DIST)
1754       if(t->dist.priority==RevalPriority)
1755         FinishReval(t);
1756 # endif
1757     
1758 # if defined(EDENOLD)
1759       // the thread could still have an outport... (BUG)
1760       if (t->eden.outport != -1) {
1761       // delete the outport for the tso which has finished...
1762         IF_PAR_DEBUG(eden_ports,
1763                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1764                               t->eden.outport, t->id));
1765         deleteOPT(t);
1766       }
1767       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1768       if (t->eden.epid != -1) {
1769         IF_PAR_DEBUG(eden_ports,
1770                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1771                            t->id, t->eden.epid));
1772         removeTSOfromProcess(t);
1773       }
1774 # endif 
1775
1776 # if defined(PAR)
1777       if (RtsFlags.ParFlags.ParStats.Full &&
1778           !RtsFlags.ParFlags.ParStats.Suppressed) 
1779         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1780
1781       //  t->par only contains statistics: left out for now...
1782       IF_PAR_DEBUG(fish,
1783                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1784                               t->id,t,t->par.sparkname));
1785 # endif
1786 #endif // PARALLEL_HASKELL
1787
1788       //
1789       // Check whether the thread that just completed was a main
1790       // thread, and if so return with the result.  
1791       //
1792       // There is an assumption here that all thread completion goes
1793       // through this point; we need to make sure that if a thread
1794       // ends up in the ThreadKilled state, that it stays on the run
1795       // queue so it can be dealt with here.
1796       //
1797       if (
1798 #if defined(RTS_SUPPORTS_THREADS)
1799           mainThread != NULL
1800 #else
1801           mainThread->tso == t
1802 #endif
1803           )
1804       {
1805           // We are a bound thread: this must be our thread that just
1806           // completed.
1807           ASSERT(mainThread->tso == t);
1808
1809           if (t->what_next == ThreadComplete) {
1810               if (mainThread->ret) {
1811                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1812                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1813               }
1814               mainThread->stat = Success;
1815           } else {
1816               if (mainThread->ret) {
1817                   *(mainThread->ret) = NULL;
1818               }
1819               if (interrupted) {
1820                   mainThread->stat = Interrupted;
1821               } else {
1822                   mainThread->stat = Killed;
1823               }
1824           }
1825 #ifdef DEBUG
1826           removeThreadLabel((StgWord)mainThread->tso->id);
1827 #endif
1828           if (mainThread->prev == NULL) {
1829               main_threads = mainThread->link;
1830           } else {
1831               mainThread->prev->link = mainThread->link;
1832           }
1833           if (mainThread->link != NULL) {
1834               mainThread->link->prev = NULL;
1835           }
1836           releaseCapability(cap);
1837           return rtsTrue; // tells schedule() to return
1838       }
1839
1840 #ifdef RTS_SUPPORTS_THREADS
1841       ASSERT(t->main == NULL);
1842 #else
1843       if (t->main != NULL) {
1844           // Must be a main thread that is not the topmost one.  Leave
1845           // it on the run queue until the stack has unwound to the
1846           // point where we can deal with this.  Leaving it on the run
1847           // queue also ensures that the garbage collector knows about
1848           // this thread and its return value (it gets dropped from the
1849           // all_threads list so there's no other way to find it).
1850           APPEND_TO_RUN_QUEUE(t);
1851       }
1852 #endif
1853       return rtsFalse;
1854 }
1855
1856 /* -----------------------------------------------------------------------------
1857  * Perform a heap census, if PROFILING
1858  * -------------------------------------------------------------------------- */
1859
1860 static void
1861 scheduleDoHeapProfile(void)
1862 {
1863 #ifdef PROFILING
1864     // When we have +RTS -i0 and we're heap profiling, do a census at
1865     // every GC.  This lets us get repeatable runs for debugging.
1866     if (performHeapProfile ||
1867         (RtsFlags.ProfFlags.profileInterval==0 &&
1868          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1869         GarbageCollect(GetRoots, rtsTrue);
1870         heapCensus();
1871         performHeapProfile = rtsFalse;
1872         ready_to_gc = rtsFalse; // we already GC'd
1873     }
1874 #endif
1875 }
1876
1877 /* -----------------------------------------------------------------------------
1878  * Perform a garbage collection if necessary
1879  * ASSUMES: sched_mutex
1880  * -------------------------------------------------------------------------- */
1881
1882 static void
1883 scheduleDoGC(void)
1884 {
1885     StgTSO *t;
1886
1887 #ifdef SMP
1888     // The last task to stop actually gets to do the GC.  The rest
1889     // of the tasks release their capabilities and wait gc_pending_cond.
1890     if (ready_to_gc && allFreeCapabilities())
1891 #else
1892     if (ready_to_gc)
1893 #endif
1894     {
1895         /* Kick any transactions which are invalid back to their
1896          * atomically frames.  When next scheduled they will try to
1897          * commit, this commit will fail and they will retry.
1898          */
1899         for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1900             if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1901                 if (!stmValidateTransaction (t -> trec)) {
1902                     IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1903                     
1904                     // strip the stack back to the ATOMICALLY_FRAME, aborting
1905                     // the (nested) transaction, and saving the stack of any
1906                     // partially-evaluated thunks on the heap.
1907                     raiseAsync_(t, NULL, rtsTrue);
1908                     
1909 #ifdef REG_R1
1910                     ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1911 #endif
1912                 }
1913             }
1914         }
1915
1916         // so this happens periodically:
1917         scheduleCheckBlackHoles();
1918
1919         /* everybody back, start the GC.
1920          * Could do it in this thread, or signal a condition var
1921          * to do it in another thread.  Either way, we need to
1922          * broadcast on gc_pending_cond afterward.
1923          */
1924 #if defined(RTS_SUPPORTS_THREADS)
1925         IF_DEBUG(scheduler,sched_belch("doing GC"));
1926 #endif
1927         GarbageCollect(GetRoots,rtsFalse);
1928         ready_to_gc = rtsFalse;
1929 #if defined(SMP)
1930         broadcastCondition(&gc_pending_cond);
1931 #endif
1932 #if defined(GRAN)
1933         /* add a ContinueThread event to continue execution of current thread */
1934         new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1935                   ContinueThread,
1936                   t, (StgClosure*)NULL, (rtsSpark*)NULL);
1937         IF_GRAN_DEBUG(bq, 
1938                       debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1939                       G_EVENTQ(0);
1940                       G_CURR_THREADQ(0));
1941 #endif /* GRAN */
1942     }
1943 }
1944
1945 /* ---------------------------------------------------------------------------
1946  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1947  * used by Control.Concurrent for error checking.
1948  * ------------------------------------------------------------------------- */
1949  
1950 StgBool
1951 rtsSupportsBoundThreads(void)
1952 {
1953 #ifdef THREADED_RTS
1954   return rtsTrue;
1955 #else
1956   return rtsFalse;
1957 #endif
1958 }
1959
1960 /* ---------------------------------------------------------------------------
1961  * isThreadBound(tso): check whether tso is bound to an OS thread.
1962  * ------------------------------------------------------------------------- */
1963  
1964 StgBool
1965 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1966 {
1967 #ifdef THREADED_RTS
1968   return (tso->main != NULL);
1969 #endif
1970   return rtsFalse;
1971 }
1972
1973 /* ---------------------------------------------------------------------------
1974  * Singleton fork(). Do not copy any running threads.
1975  * ------------------------------------------------------------------------- */
1976
1977 #ifndef mingw32_HOST_OS
1978 #define FORKPROCESS_PRIMOP_SUPPORTED
1979 #endif
1980
1981 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1982 static void 
1983 deleteThreadImmediately(StgTSO *tso);
1984 #endif
1985 StgInt
1986 forkProcess(HsStablePtr *entry
1987 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1988             STG_UNUSED
1989 #endif
1990            )
1991 {
1992 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1993   pid_t pid;
1994   StgTSO* t,*next;
1995   StgMainThread *m;
1996   SchedulerStatus rc;
1997
1998   IF_DEBUG(scheduler,sched_belch("forking!"));
1999   rts_lock(); // This not only acquires sched_mutex, it also
2000               // makes sure that no other threads are running
2001
2002   pid = fork();
2003
2004   if (pid) { /* parent */
2005
2006   /* just return the pid */
2007     rts_unlock();
2008     return pid;
2009     
2010   } else { /* child */
2011     
2012     
2013       // delete all threads
2014     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
2015     
2016     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2017       next = t->link;
2018
2019         // don't allow threads to catch the ThreadKilled exception
2020       deleteThreadImmediately(t);
2021     }
2022     
2023       // wipe the main thread list
2024     while((m = main_threads) != NULL) {
2025       main_threads = m->link;
2026 # ifdef THREADED_RTS
2027       closeCondition(&m->bound_thread_cond);
2028 # endif
2029       stgFree(m);
2030     }
2031     
2032     rc = rts_evalStableIO(entry, NULL);  // run the action
2033     rts_checkSchedStatus("forkProcess",rc);
2034     
2035     rts_unlock();
2036     
2037     hs_exit();                      // clean up and exit
2038     stg_exit(0);
2039   }
2040 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2041   barf("forkProcess#: primop not supported, sorry!\n");
2042   return -1;
2043 #endif
2044 }
2045
2046 /* ---------------------------------------------------------------------------
2047  * deleteAllThreads():  kill all the live threads.
2048  *
2049  * This is used when we catch a user interrupt (^C), before performing
2050  * any necessary cleanups and running finalizers.
2051  *
2052  * Locks: sched_mutex held.
2053  * ------------------------------------------------------------------------- */
2054    
2055 void
2056 deleteAllThreads ( void )
2057 {
2058   StgTSO* t, *next;
2059   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2060   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2061       next = t->global_link;
2062       deleteThread(t);
2063   }      
2064
2065   // The run queue now contains a bunch of ThreadKilled threads.  We
2066   // must not throw these away: the main thread(s) will be in there
2067   // somewhere, and the main scheduler loop has to deal with it.
2068   // Also, the run queue is the only thing keeping these threads from
2069   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2070
2071   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2072   ASSERT(blackhole_queue == END_TSO_QUEUE);
2073   ASSERT(sleeping_queue == END_TSO_QUEUE);
2074 }
2075
2076 /* startThread and  insertThread are now in GranSim.c -- HWL */
2077
2078
2079 /* ---------------------------------------------------------------------------
2080  * Suspending & resuming Haskell threads.
2081  * 
2082  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2083  * its capability before calling the C function.  This allows another
2084  * task to pick up the capability and carry on running Haskell
2085  * threads.  It also means that if the C call blocks, it won't lock
2086  * the whole system.
2087  *
2088  * The Haskell thread making the C call is put to sleep for the
2089  * duration of the call, on the susepended_ccalling_threads queue.  We
2090  * give out a token to the task, which it can use to resume the thread
2091  * on return from the C function.
2092  * ------------------------------------------------------------------------- */
2093    
2094 StgInt
2095 suspendThread( StgRegTable *reg )
2096 {
2097   nat tok;
2098   Capability *cap;
2099   int saved_errno = errno;
2100
2101   /* assume that *reg is a pointer to the StgRegTable part
2102    * of a Capability.
2103    */
2104   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
2105
2106   ACQUIRE_LOCK(&sched_mutex);
2107
2108   IF_DEBUG(scheduler,
2109            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
2110
2111   // XXX this might not be necessary --SDM
2112   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
2113
2114   threadPaused(cap->r.rCurrentTSO);
2115   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
2116   suspended_ccalling_threads = cap->r.rCurrentTSO;
2117
2118   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
2119       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
2120       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
2121   } else {
2122       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
2123   }
2124
2125   /* Use the thread ID as the token; it should be unique */
2126   tok = cap->r.rCurrentTSO->id;
2127
2128   /* Hand back capability */
2129   releaseCapability(cap);
2130   
2131 #if defined(RTS_SUPPORTS_THREADS)
2132   /* Preparing to leave the RTS, so ensure there's a native thread/task
2133      waiting to take over.
2134   */
2135   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
2136 #endif
2137
2138   in_haskell = rtsFalse;
2139   RELEASE_LOCK(&sched_mutex);
2140   
2141   errno = saved_errno;
2142   return tok; 
2143 }
2144
2145 StgRegTable *
2146 resumeThread( StgInt tok )
2147 {
2148   StgTSO *tso, **prev;
2149   Capability *cap;
2150   int saved_errno = errno;
2151
2152 #if defined(RTS_SUPPORTS_THREADS)
2153   /* Wait for permission to re-enter the RTS with the result. */
2154   ACQUIRE_LOCK(&sched_mutex);
2155   waitForReturnCapability(&sched_mutex, &cap);
2156
2157   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
2158 #else
2159   grabCapability(&cap);
2160 #endif
2161
2162   /* Remove the thread off of the suspended list */
2163   prev = &suspended_ccalling_threads;
2164   for (tso = suspended_ccalling_threads; 
2165        tso != END_TSO_QUEUE; 
2166        prev = &tso->link, tso = tso->link) {
2167     if (tso->id == (StgThreadID)tok) {
2168       *prev = tso->link;
2169       break;
2170     }
2171   }
2172   if (tso == END_TSO_QUEUE) {
2173     barf("resumeThread: thread not found");
2174   }
2175   tso->link = END_TSO_QUEUE;
2176   
2177   if(tso->why_blocked == BlockedOnCCall) {
2178       awakenBlockedQueueNoLock(tso->blocked_exceptions);
2179       tso->blocked_exceptions = NULL;
2180   }
2181   
2182   /* Reset blocking status */
2183   tso->why_blocked  = NotBlocked;
2184
2185   cap->r.rCurrentTSO = tso;
2186   in_haskell = rtsTrue;
2187   RELEASE_LOCK(&sched_mutex);
2188   errno = saved_errno;
2189   return &cap->r;
2190 }
2191
2192 /* ---------------------------------------------------------------------------
2193  * Comparing Thread ids.
2194  *
2195  * This is used from STG land in the implementation of the
2196  * instances of Eq/Ord for ThreadIds.
2197  * ------------------------------------------------------------------------ */
2198
2199 int
2200 cmp_thread(StgPtr tso1, StgPtr tso2) 
2201
2202   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2203   StgThreadID id2 = ((StgTSO *)tso2)->id;
2204  
2205   if (id1 < id2) return (-1);
2206   if (id1 > id2) return 1;
2207   return 0;
2208 }
2209
2210 /* ---------------------------------------------------------------------------
2211  * Fetching the ThreadID from an StgTSO.
2212  *
2213  * This is used in the implementation of Show for ThreadIds.
2214  * ------------------------------------------------------------------------ */
2215 int
2216 rts_getThreadId(StgPtr tso) 
2217 {
2218   return ((StgTSO *)tso)->id;
2219 }
2220
2221 #ifdef DEBUG
2222 void
2223 labelThread(StgPtr tso, char *label)
2224 {
2225   int len;
2226   void *buf;
2227
2228   /* Caveat: Once set, you can only set the thread name to "" */
2229   len = strlen(label)+1;
2230   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2231   strncpy(buf,label,len);
2232   /* Update will free the old memory for us */
2233   updateThreadLabel(((StgTSO *)tso)->id,buf);
2234 }
2235 #endif /* DEBUG */
2236
2237 /* ---------------------------------------------------------------------------
2238    Create a new thread.
2239
2240    The new thread starts with the given stack size.  Before the
2241    scheduler can run, however, this thread needs to have a closure
2242    (and possibly some arguments) pushed on its stack.  See
2243    pushClosure() in Schedule.h.
2244
2245    createGenThread() and createIOThread() (in SchedAPI.h) are
2246    convenient packaged versions of this function.
2247
2248    currently pri (priority) is only used in a GRAN setup -- HWL
2249    ------------------------------------------------------------------------ */
2250 #if defined(GRAN)
2251 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2252 StgTSO *
2253 createThread(nat size, StgInt pri)
2254 #else
2255 StgTSO *
2256 createThread(nat size)
2257 #endif
2258 {
2259
2260     StgTSO *tso;
2261     nat stack_size;
2262
2263     /* First check whether we should create a thread at all */
2264 #if defined(PARALLEL_HASKELL)
2265   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2266   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2267     threadsIgnored++;
2268     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2269           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2270     return END_TSO_QUEUE;
2271   }
2272   threadsCreated++;
2273 #endif
2274
2275 #if defined(GRAN)
2276   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2277 #endif
2278
2279   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2280
2281   /* catch ridiculously small stack sizes */
2282   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2283     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2284   }
2285
2286   stack_size = size - TSO_STRUCT_SIZEW;
2287
2288   tso = (StgTSO *)allocate(size);
2289   TICK_ALLOC_TSO(stack_size, 0);
2290
2291   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2292 #if defined(GRAN)
2293   SET_GRAN_HDR(tso, ThisPE);
2294 #endif
2295
2296   // Always start with the compiled code evaluator
2297   tso->what_next = ThreadRunGHC;
2298
2299   tso->id = next_thread_id++; 
2300   tso->why_blocked  = NotBlocked;
2301   tso->blocked_exceptions = NULL;
2302
2303   tso->saved_errno = 0;
2304   tso->main = NULL;
2305   
2306   tso->stack_size   = stack_size;
2307   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2308                               - TSO_STRUCT_SIZEW;
2309   tso->sp           = (P_)&(tso->stack) + stack_size;
2310
2311   tso->trec = NO_TREC;
2312
2313 #ifdef PROFILING
2314   tso->prof.CCCS = CCS_MAIN;
2315 #endif
2316
2317   /* put a stop frame on the stack */
2318   tso->sp -= sizeofW(StgStopFrame);
2319   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2320   tso->link = END_TSO_QUEUE;
2321
2322   // ToDo: check this
2323 #if defined(GRAN)
2324   /* uses more flexible routine in GranSim */
2325   insertThread(tso, CurrentProc);
2326 #else
2327   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2328    * from its creation
2329    */
2330 #endif
2331
2332 #if defined(GRAN) 
2333   if (RtsFlags.GranFlags.GranSimStats.Full) 
2334     DumpGranEvent(GR_START,tso);
2335 #elif defined(PARALLEL_HASKELL)
2336   if (RtsFlags.ParFlags.ParStats.Full) 
2337     DumpGranEvent(GR_STARTQ,tso);
2338   /* HACk to avoid SCHEDULE 
2339      LastTSO = tso; */
2340 #endif
2341
2342   /* Link the new thread on the global thread list.
2343    */
2344   tso->global_link = all_threads;
2345   all_threads = tso;
2346
2347 #if defined(DIST)
2348   tso->dist.priority = MandatoryPriority; //by default that is...
2349 #endif
2350
2351 #if defined(GRAN)
2352   tso->gran.pri = pri;
2353 # if defined(DEBUG)
2354   tso->gran.magic = TSO_MAGIC; // debugging only
2355 # endif
2356   tso->gran.sparkname   = 0;
2357   tso->gran.startedat   = CURRENT_TIME; 
2358   tso->gran.exported    = 0;
2359   tso->gran.basicblocks = 0;
2360   tso->gran.allocs      = 0;
2361   tso->gran.exectime    = 0;
2362   tso->gran.fetchtime   = 0;
2363   tso->gran.fetchcount  = 0;
2364   tso->gran.blocktime   = 0;
2365   tso->gran.blockcount  = 0;
2366   tso->gran.blockedat   = 0;
2367   tso->gran.globalsparks = 0;
2368   tso->gran.localsparks  = 0;
2369   if (RtsFlags.GranFlags.Light)
2370     tso->gran.clock  = Now; /* local clock */
2371   else
2372     tso->gran.clock  = 0;
2373
2374   IF_DEBUG(gran,printTSO(tso));
2375 #elif defined(PARALLEL_HASKELL)
2376 # if defined(DEBUG)
2377   tso->par.magic = TSO_MAGIC; // debugging only
2378 # endif
2379   tso->par.sparkname   = 0;
2380   tso->par.startedat   = CURRENT_TIME; 
2381   tso->par.exported    = 0;
2382   tso->par.basicblocks = 0;
2383   tso->par.allocs      = 0;
2384   tso->par.exectime    = 0;
2385   tso->par.fetchtime   = 0;
2386   tso->par.fetchcount  = 0;
2387   tso->par.blocktime   = 0;
2388   tso->par.blockcount  = 0;
2389   tso->par.blockedat   = 0;
2390   tso->par.globalsparks = 0;
2391   tso->par.localsparks  = 0;
2392 #endif
2393
2394 #if defined(GRAN)
2395   globalGranStats.tot_threads_created++;
2396   globalGranStats.threads_created_on_PE[CurrentProc]++;
2397   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2398   globalGranStats.tot_sq_probes++;
2399 #elif defined(PARALLEL_HASKELL)
2400   // collect parallel global statistics (currently done together with GC stats)
2401   if (RtsFlags.ParFlags.ParStats.Global &&
2402       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2403     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2404     globalParStats.tot_threads_created++;
2405   }
2406 #endif 
2407
2408 #if defined(GRAN)
2409   IF_GRAN_DEBUG(pri,
2410                 sched_belch("==__ schedule: Created TSO %d (%p);",
2411                       CurrentProc, tso, tso->id));
2412 #elif defined(PARALLEL_HASKELL)
2413   IF_PAR_DEBUG(verbose,
2414                sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2415                            (long)tso->id, tso, advisory_thread_count));
2416 #else
2417   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2418                                  (long)tso->id, (long)tso->stack_size));
2419 #endif    
2420   return tso;
2421 }
2422
2423 #if defined(PAR)
2424 /* RFP:
2425    all parallel thread creation calls should fall through the following routine.
2426 */
2427 StgTSO *
2428 createThreadFromSpark(rtsSpark spark) 
2429 { StgTSO *tso;
2430   ASSERT(spark != (rtsSpark)NULL);
2431 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2432   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2433   { threadsIgnored++;
2434     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2435           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2436     return END_TSO_QUEUE;
2437   }
2438   else
2439   { threadsCreated++;
2440     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2441     if (tso==END_TSO_QUEUE)     
2442       barf("createSparkThread: Cannot create TSO");
2443 #if defined(DIST)
2444     tso->priority = AdvisoryPriority;
2445 #endif
2446     pushClosure(tso,spark);
2447     addToRunQueue(tso);
2448     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2449   }
2450   return tso;
2451 }
2452 #endif
2453
2454 /*
2455   Turn a spark into a thread.
2456   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2457 */
2458 #if 0
2459 StgTSO *
2460 activateSpark (rtsSpark spark) 
2461 {
2462   StgTSO *tso;
2463
2464   tso = createSparkThread(spark);
2465   if (RtsFlags.ParFlags.ParStats.Full) {   
2466     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2467       IF_PAR_DEBUG(verbose,
2468                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2469                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2470   }
2471   // ToDo: fwd info on local/global spark to thread -- HWL
2472   // tso->gran.exported =  spark->exported;
2473   // tso->gran.locked =   !spark->global;
2474   // tso->gran.sparkname = spark->name;
2475
2476   return tso;
2477 }
2478 #endif
2479
2480 /* ---------------------------------------------------------------------------
2481  * scheduleThread()
2482  *
2483  * scheduleThread puts a thread on the head of the runnable queue.
2484  * This will usually be done immediately after a thread is created.
2485  * The caller of scheduleThread must create the thread using e.g.
2486  * createThread and push an appropriate closure
2487  * on this thread's stack before the scheduler is invoked.
2488  * ------------------------------------------------------------------------ */
2489
2490 static void
2491 scheduleThread_(StgTSO *tso)
2492 {
2493   // The thread goes at the *end* of the run-queue, to avoid possible
2494   // starvation of any threads already on the queue.
2495   APPEND_TO_RUN_QUEUE(tso);
2496   threadRunnable();
2497 }
2498
2499 void
2500 scheduleThread(StgTSO* tso)
2501 {
2502   ACQUIRE_LOCK(&sched_mutex);
2503   scheduleThread_(tso);
2504   RELEASE_LOCK(&sched_mutex);
2505 }
2506
2507 #if defined(RTS_SUPPORTS_THREADS)
2508 static Condition bound_cond_cache;
2509 static int bound_cond_cache_full = 0;
2510 #endif
2511
2512
2513 SchedulerStatus
2514 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
2515                    Capability *initialCapability)
2516 {
2517     // Precondition: sched_mutex must be held
2518     StgMainThread *m;
2519
2520     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2521     m->tso = tso;
2522     tso->main = m;
2523     m->ret = ret;
2524     m->stat = NoStatus;
2525     m->link = main_threads;
2526     m->prev = NULL;
2527     if (main_threads != NULL) {
2528         main_threads->prev = m;
2529     }
2530     main_threads = m;
2531
2532 #if defined(RTS_SUPPORTS_THREADS)
2533     // Allocating a new condition for each thread is expensive, so we
2534     // cache one.  This is a pretty feeble hack, but it helps speed up
2535     // consecutive call-ins quite a bit.
2536     if (bound_cond_cache_full) {
2537         m->bound_thread_cond = bound_cond_cache;
2538         bound_cond_cache_full = 0;
2539     } else {
2540         initCondition(&m->bound_thread_cond);
2541     }
2542 #endif
2543
2544     /* Put the thread on the main-threads list prior to scheduling the TSO.
2545        Failure to do so introduces a race condition in the MT case (as
2546        identified by Wolfgang Thaller), whereby the new task/OS thread 
2547        created by scheduleThread_() would complete prior to the thread
2548        that spawned it managed to put 'itself' on the main-threads list.
2549        The upshot of it all being that the worker thread wouldn't get to
2550        signal the completion of the its work item for the main thread to
2551        see (==> it got stuck waiting.)    -- sof 6/02.
2552     */
2553     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2554     
2555     APPEND_TO_RUN_QUEUE(tso);
2556     // NB. Don't call threadRunnable() here, because the thread is
2557     // bound and only runnable by *this* OS thread, so waking up other
2558     // workers will just slow things down.
2559
2560     return waitThread_(m, initialCapability);
2561 }
2562
2563 /* ---------------------------------------------------------------------------
2564  * initScheduler()
2565  *
2566  * Initialise the scheduler.  This resets all the queues - if the
2567  * queues contained any threads, they'll be garbage collected at the
2568  * next pass.
2569  *
2570  * ------------------------------------------------------------------------ */
2571
2572 void 
2573 initScheduler(void)
2574 {
2575 #if defined(GRAN)
2576   nat i;
2577
2578   for (i=0; i<=MAX_PROC; i++) {
2579     run_queue_hds[i]      = END_TSO_QUEUE;
2580     run_queue_tls[i]      = END_TSO_QUEUE;
2581     blocked_queue_hds[i]  = END_TSO_QUEUE;
2582     blocked_queue_tls[i]  = END_TSO_QUEUE;
2583     ccalling_threadss[i]  = END_TSO_QUEUE;
2584     blackhole_queue[i]    = END_TSO_QUEUE;
2585     sleeping_queue        = END_TSO_QUEUE;
2586   }
2587 #else
2588   run_queue_hd      = END_TSO_QUEUE;
2589   run_queue_tl      = END_TSO_QUEUE;
2590   blocked_queue_hd  = END_TSO_QUEUE;
2591   blocked_queue_tl  = END_TSO_QUEUE;
2592   blackhole_queue   = END_TSO_QUEUE;
2593   sleeping_queue    = END_TSO_QUEUE;
2594 #endif 
2595
2596   suspended_ccalling_threads  = END_TSO_QUEUE;
2597
2598   main_threads = NULL;
2599   all_threads  = END_TSO_QUEUE;
2600
2601   context_switch = 0;
2602   interrupted    = 0;
2603
2604   RtsFlags.ConcFlags.ctxtSwitchTicks =
2605       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2606       
2607 #if defined(RTS_SUPPORTS_THREADS)
2608   /* Initialise the mutex and condition variables used by
2609    * the scheduler. */
2610   initMutex(&sched_mutex);
2611   initMutex(&term_mutex);
2612 #endif
2613   
2614   ACQUIRE_LOCK(&sched_mutex);
2615
2616   /* A capability holds the state a native thread needs in
2617    * order to execute STG code. At least one capability is
2618    * floating around (only SMP builds have more than one).
2619    */
2620   initCapabilities();
2621   
2622 #if defined(RTS_SUPPORTS_THREADS)
2623     /* start our haskell execution tasks */
2624     startTaskManager(0,taskStart);
2625 #endif
2626
2627 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2628   initSparkPools();
2629 #endif
2630
2631   RELEASE_LOCK(&sched_mutex);
2632 }
2633
2634 void
2635 exitScheduler( void )
2636 {
2637 #if defined(RTS_SUPPORTS_THREADS)
2638   stopTaskManager();
2639 #endif
2640   interrupted = rtsTrue;
2641   shutting_down_scheduler = rtsTrue;
2642 }
2643
2644 /* ----------------------------------------------------------------------------
2645    Managing the per-task allocation areas.
2646    
2647    Each capability comes with an allocation area.  These are
2648    fixed-length block lists into which allocation can be done.
2649
2650    ToDo: no support for two-space collection at the moment???
2651    ------------------------------------------------------------------------- */
2652
2653 static SchedulerStatus
2654 waitThread_(StgMainThread* m, Capability *initialCapability)
2655 {
2656   SchedulerStatus stat;
2657
2658   // Precondition: sched_mutex must be held.
2659   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2660
2661 #if defined(GRAN)
2662   /* GranSim specific init */
2663   CurrentTSO = m->tso;                // the TSO to run
2664   procStatus[MainProc] = Busy;        // status of main PE
2665   CurrentProc = MainProc;             // PE to run it on
2666   schedule(m,initialCapability);
2667 #else
2668   schedule(m,initialCapability);
2669   ASSERT(m->stat != NoStatus);
2670 #endif
2671
2672   stat = m->stat;
2673
2674 #if defined(RTS_SUPPORTS_THREADS)
2675   // Free the condition variable, returning it to the cache if possible.
2676   if (!bound_cond_cache_full) {
2677       bound_cond_cache = m->bound_thread_cond;
2678       bound_cond_cache_full = 1;
2679   } else {
2680       closeCondition(&m->bound_thread_cond);
2681   }
2682 #endif
2683
2684   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2685   stgFree(m);
2686
2687   // Postcondition: sched_mutex still held
2688   return stat;
2689 }
2690
2691 /* ---------------------------------------------------------------------------
2692    Where are the roots that we know about?
2693
2694         - all the threads on the runnable queue
2695         - all the threads on the blocked queue
2696         - all the threads on the sleeping queue
2697         - all the thread currently executing a _ccall_GC
2698         - all the "main threads"
2699      
2700    ------------------------------------------------------------------------ */
2701
2702 /* This has to be protected either by the scheduler monitor, or by the
2703         garbage collection monitor (probably the latter).
2704         KH @ 25/10/99
2705 */
2706
2707 void
2708 GetRoots( evac_fn evac )
2709 {
2710 #if defined(GRAN)
2711   {
2712     nat i;
2713     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2714       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2715           evac((StgClosure **)&run_queue_hds[i]);
2716       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2717           evac((StgClosure **)&run_queue_tls[i]);
2718       
2719       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2720           evac((StgClosure **)&blocked_queue_hds[i]);
2721       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2722           evac((StgClosure **)&blocked_queue_tls[i]);
2723       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2724           evac((StgClosure **)&ccalling_threads[i]);
2725     }
2726   }
2727
2728   markEventQueue();
2729
2730 #else /* !GRAN */
2731   if (run_queue_hd != END_TSO_QUEUE) {
2732       ASSERT(run_queue_tl != END_TSO_QUEUE);
2733       evac((StgClosure **)&run_queue_hd);
2734       evac((StgClosure **)&run_queue_tl);
2735   }
2736   
2737   if (blocked_queue_hd != END_TSO_QUEUE) {
2738       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2739       evac((StgClosure **)&blocked_queue_hd);
2740       evac((StgClosure **)&blocked_queue_tl);
2741   }
2742   
2743   if (sleeping_queue != END_TSO_QUEUE) {
2744       evac((StgClosure **)&sleeping_queue);
2745   }
2746 #endif 
2747
2748   if (blackhole_queue != END_TSO_QUEUE) {
2749       evac((StgClosure **)&blackhole_queue);
2750   }
2751
2752   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2753       evac((StgClosure **)&suspended_ccalling_threads);
2754   }
2755
2756 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2757   markSparkQueue(evac);
2758 #endif
2759
2760 #if defined(RTS_USER_SIGNALS)
2761   // mark the signal handlers (signals should be already blocked)
2762   markSignalHandlers(evac);
2763 #endif
2764 }
2765
2766 /* -----------------------------------------------------------------------------
2767    performGC
2768
2769    This is the interface to the garbage collector from Haskell land.
2770    We provide this so that external C code can allocate and garbage
2771    collect when called from Haskell via _ccall_GC.
2772
2773    It might be useful to provide an interface whereby the programmer
2774    can specify more roots (ToDo).
2775    
2776    This needs to be protected by the GC condition variable above.  KH.
2777    -------------------------------------------------------------------------- */
2778
2779 static void (*extra_roots)(evac_fn);
2780
2781 void
2782 performGC(void)
2783 {
2784   /* Obligated to hold this lock upon entry */
2785   ACQUIRE_LOCK(&sched_mutex);
2786   GarbageCollect(GetRoots,rtsFalse);
2787   RELEASE_LOCK(&sched_mutex);
2788 }
2789
2790 void
2791 performMajorGC(void)
2792 {
2793   ACQUIRE_LOCK(&sched_mutex);
2794   GarbageCollect(GetRoots,rtsTrue);
2795   RELEASE_LOCK(&sched_mutex);
2796 }
2797
2798 static void
2799 AllRoots(evac_fn evac)
2800 {
2801     GetRoots(evac);             // the scheduler's roots
2802     extra_roots(evac);          // the user's roots
2803 }
2804
2805 void
2806 performGCWithRoots(void (*get_roots)(evac_fn))
2807 {
2808   ACQUIRE_LOCK(&sched_mutex);
2809   extra_roots = get_roots;
2810   GarbageCollect(AllRoots,rtsFalse);
2811   RELEASE_LOCK(&sched_mutex);
2812 }
2813
2814 /* -----------------------------------------------------------------------------
2815    Stack overflow
2816
2817    If the thread has reached its maximum stack size, then raise the
2818    StackOverflow exception in the offending thread.  Otherwise
2819    relocate the TSO into a larger chunk of memory and adjust its stack
2820    size appropriately.
2821    -------------------------------------------------------------------------- */
2822
2823 static StgTSO *
2824 threadStackOverflow(StgTSO *tso)
2825 {
2826   nat new_stack_size, stack_words;
2827   lnat new_tso_size;
2828   StgPtr new_sp;
2829   StgTSO *dest;
2830
2831   IF_DEBUG(sanity,checkTSO(tso));
2832   if (tso->stack_size >= tso->max_stack_size) {
2833
2834     IF_DEBUG(gc,
2835              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2836                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2837              /* If we're debugging, just print out the top of the stack */
2838              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2839                                               tso->sp+64)));
2840
2841     /* Send this thread the StackOverflow exception */
2842     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2843     return tso;
2844   }
2845
2846   /* Try to double the current stack size.  If that takes us over the
2847    * maximum stack size for this thread, then use the maximum instead.
2848    * Finally round up so the TSO ends up as a whole number of blocks.
2849    */
2850   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2851   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2852                                        TSO_STRUCT_SIZE)/sizeof(W_);
2853   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2854   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2855
2856   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2857
2858   dest = (StgTSO *)allocate(new_tso_size);
2859   TICK_ALLOC_TSO(new_stack_size,0);
2860
2861   /* copy the TSO block and the old stack into the new area */
2862   memcpy(dest,tso,TSO_STRUCT_SIZE);
2863   stack_words = tso->stack + tso->stack_size - tso->sp;
2864   new_sp = (P_)dest + new_tso_size - stack_words;
2865   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2866
2867   /* relocate the stack pointers... */
2868   dest->sp         = new_sp;
2869   dest->stack_size = new_stack_size;
2870         
2871   /* Mark the old TSO as relocated.  We have to check for relocated
2872    * TSOs in the garbage collector and any primops that deal with TSOs.
2873    *
2874    * It's important to set the sp value to just beyond the end
2875    * of the stack, so we don't attempt to scavenge any part of the
2876    * dead TSO's stack.
2877    */
2878   tso->what_next = ThreadRelocated;
2879   tso->link = dest;
2880   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2881   tso->why_blocked = NotBlocked;
2882
2883   IF_PAR_DEBUG(verbose,
2884                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2885                      tso->id, tso, tso->stack_size);
2886                /* If we're debugging, just print out the top of the stack */
2887                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2888                                                 tso->sp+64)));
2889   
2890   IF_DEBUG(sanity,checkTSO(tso));
2891 #if 0
2892   IF_DEBUG(scheduler,printTSO(dest));
2893 #endif
2894
2895   return dest;
2896 }
2897
2898 /* ---------------------------------------------------------------------------
2899    Wake up a queue that was blocked on some resource.
2900    ------------------------------------------------------------------------ */
2901
2902 #if defined(GRAN)
2903 STATIC_INLINE void
2904 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2905 {
2906 }
2907 #elif defined(PARALLEL_HASKELL)
2908 STATIC_INLINE void
2909 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2910 {
2911   /* write RESUME events to log file and
2912      update blocked and fetch time (depending on type of the orig closure) */
2913   if (RtsFlags.ParFlags.ParStats.Full) {
2914     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2915                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2916                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2917     if (EMPTY_RUN_QUEUE())
2918       emitSchedule = rtsTrue;
2919
2920     switch (get_itbl(node)->type) {
2921         case FETCH_ME_BQ:
2922           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2923           break;
2924         case RBH:
2925         case FETCH_ME:
2926         case BLACKHOLE_BQ:
2927           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2928           break;
2929 #ifdef DIST
2930         case MVAR:
2931           break;
2932 #endif    
2933         default:
2934           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2935         }
2936       }
2937 }
2938 #endif
2939
2940 #if defined(GRAN)
2941 static StgBlockingQueueElement *
2942 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2943 {
2944     StgTSO *tso;
2945     PEs node_loc, tso_loc;
2946
2947     node_loc = where_is(node); // should be lifted out of loop
2948     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2949     tso_loc = where_is((StgClosure *)tso);
2950     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2951       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2952       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2953       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2954       // insertThread(tso, node_loc);
2955       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2956                 ResumeThread,
2957                 tso, node, (rtsSpark*)NULL);
2958       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2959       // len_local++;
2960       // len++;
2961     } else { // TSO is remote (actually should be FMBQ)
2962       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2963                                   RtsFlags.GranFlags.Costs.gunblocktime +
2964                                   RtsFlags.GranFlags.Costs.latency;
2965       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2966                 UnblockThread,
2967                 tso, node, (rtsSpark*)NULL);
2968       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2969       // len++;
2970     }
2971     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2972     IF_GRAN_DEBUG(bq,
2973                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2974                           (node_loc==tso_loc ? "Local" : "Global"), 
2975                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2976     tso->block_info.closure = NULL;
2977     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2978                              tso->id, tso));
2979 }
2980 #elif defined(PARALLEL_HASKELL)
2981 static StgBlockingQueueElement *
2982 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2983 {
2984     StgBlockingQueueElement *next;
2985
2986     switch (get_itbl(bqe)->type) {
2987     case TSO:
2988       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2989       /* if it's a TSO just push it onto the run_queue */
2990       next = bqe->link;
2991       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2992       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2993       threadRunnable();
2994       unblockCount(bqe, node);
2995       /* reset blocking status after dumping event */
2996       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2997       break;
2998
2999     case BLOCKED_FETCH:
3000       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3001       next = bqe->link;
3002       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3003       PendingFetches = (StgBlockedFetch *)bqe;
3004       break;
3005
3006 # if defined(DEBUG)
3007       /* can ignore this case in a non-debugging setup; 
3008          see comments on RBHSave closures above */
3009     case CONSTR:
3010       /* check that the closure is an RBHSave closure */
3011       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3012              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3013              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3014       break;
3015
3016     default:
3017       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3018            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3019            (StgClosure *)bqe);
3020 # endif
3021     }
3022   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3023   return next;
3024 }
3025
3026 #else /* !GRAN && !PARALLEL_HASKELL */
3027 static StgTSO *
3028 unblockOneLocked(StgTSO *tso)
3029 {
3030   StgTSO *next;
3031
3032   ASSERT(get_itbl(tso)->type == TSO);
3033   ASSERT(tso->why_blocked != NotBlocked);
3034   tso->why_blocked = NotBlocked;
3035   next = tso->link;
3036   tso->link = END_TSO_QUEUE;
3037   APPEND_TO_RUN_QUEUE(tso);
3038   threadRunnable();
3039   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3040   return next;
3041 }
3042 #endif
3043
3044 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3045 INLINE_ME StgBlockingQueueElement *
3046 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3047 {
3048   ACQUIRE_LOCK(&sched_mutex);
3049   bqe = unblockOneLocked(bqe, node);
3050   RELEASE_LOCK(&sched_mutex);
3051   return bqe;
3052 }
3053 #else
3054 INLINE_ME StgTSO *
3055 unblockOne(StgTSO *tso)
3056 {
3057   ACQUIRE_LOCK(&sched_mutex);
3058   tso = unblockOneLocked(tso);
3059   RELEASE_LOCK(&sched_mutex);
3060   return tso;
3061 }
3062 #endif
3063
3064 #if defined(GRAN)
3065 void 
3066 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3067 {
3068   StgBlockingQueueElement *bqe;
3069   PEs node_loc;
3070   nat len = 0; 
3071
3072   IF_GRAN_DEBUG(bq, 
3073                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3074                       node, CurrentProc, CurrentTime[CurrentProc], 
3075                       CurrentTSO->id, CurrentTSO));
3076
3077   node_loc = where_is(node);
3078
3079   ASSERT(q == END_BQ_QUEUE ||
3080          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3081          get_itbl(q)->type == CONSTR); // closure (type constructor)
3082   ASSERT(is_unique(node));
3083
3084   /* FAKE FETCH: magically copy the node to the tso's proc;
3085      no Fetch necessary because in reality the node should not have been 
3086      moved to the other PE in the first place
3087   */
3088   if (CurrentProc!=node_loc) {
3089     IF_GRAN_DEBUG(bq, 
3090                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3091                         node, node_loc, CurrentProc, CurrentTSO->id, 
3092                         // CurrentTSO, where_is(CurrentTSO),
3093                         node->header.gran.procs));
3094     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3095     IF_GRAN_DEBUG(bq, 
3096                   debugBelch("## new bitmask of node %p is %#x\n",
3097                         node, node->header.gran.procs));
3098     if (RtsFlags.GranFlags.GranSimStats.Global) {
3099       globalGranStats.tot_fake_fetches++;
3100     }
3101   }
3102
3103   bqe = q;
3104   // ToDo: check: ASSERT(CurrentProc==node_loc);
3105   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3106     //next = bqe->link;
3107     /* 
3108        bqe points to the current element in the queue
3109        next points to the next element in the queue
3110     */
3111     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3112     //tso_loc = where_is(tso);
3113     len++;
3114     bqe = unblockOneLocked(bqe, node);
3115   }
3116
3117   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3118      the closure to make room for the anchor of the BQ */
3119   if (bqe!=END_BQ_QUEUE) {
3120     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3121     /*
3122     ASSERT((info_ptr==&RBH_Save_0_info) ||
3123            (info_ptr==&RBH_Save_1_info) ||
3124            (info_ptr==&RBH_Save_2_info));
3125     */
3126     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3127     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3128     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3129
3130     IF_GRAN_DEBUG(bq,
3131                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3132                         node, info_type(node)));
3133   }
3134
3135   /* statistics gathering */
3136   if (RtsFlags.GranFlags.GranSimStats.Global) {
3137     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3138     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3139     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3140     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3141   }
3142   IF_GRAN_DEBUG(bq,
3143                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3144                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3145 }
3146 #elif defined(PARALLEL_HASKELL)
3147 void 
3148 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3149 {
3150   StgBlockingQueueElement *bqe;
3151
3152   ACQUIRE_LOCK(&sched_mutex);
3153
3154   IF_PAR_DEBUG(verbose, 
3155                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3156                      node, mytid));
3157 #ifdef DIST  
3158   //RFP
3159   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3160     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3161     return;
3162   }
3163 #endif
3164   
3165   ASSERT(q == END_BQ_QUEUE ||
3166          get_itbl(q)->type == TSO ||           
3167          get_itbl(q)->type == BLOCKED_FETCH || 
3168          get_itbl(q)->type == CONSTR); 
3169
3170   bqe = q;
3171   while (get_itbl(bqe)->type==TSO || 
3172          get_itbl(bqe)->type==BLOCKED_FETCH) {
3173     bqe = unblockOneLocked(bqe, node);
3174   }
3175   RELEASE_LOCK(&sched_mutex);
3176 }
3177
3178 #else   /* !GRAN && !PARALLEL_HASKELL */
3179
3180 void
3181 awakenBlockedQueueNoLock(StgTSO *tso)
3182 {
3183   while (tso != END_TSO_QUEUE) {
3184     tso = unblockOneLocked(tso);
3185   }
3186 }
3187
3188 void
3189 awakenBlockedQueue(StgTSO *tso)
3190 {
3191   ACQUIRE_LOCK(&sched_mutex);
3192   while (tso != END_TSO_QUEUE) {
3193     tso = unblockOneLocked(tso);
3194   }
3195   RELEASE_LOCK(&sched_mutex);
3196 }
3197 #endif
3198
3199 /* ---------------------------------------------------------------------------
3200    Interrupt execution
3201    - usually called inside a signal handler so it mustn't do anything fancy.   
3202    ------------------------------------------------------------------------ */
3203
3204 void
3205 interruptStgRts(void)
3206 {
3207     interrupted    = 1;
3208     context_switch = 1;
3209 }
3210
3211 /* -----------------------------------------------------------------------------
3212    Unblock a thread
3213
3214    This is for use when we raise an exception in another thread, which
3215    may be blocked.
3216    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3217    -------------------------------------------------------------------------- */
3218
3219 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3220 /*
3221   NB: only the type of the blocking queue is different in GranSim and GUM
3222       the operations on the queue-elements are the same
3223       long live polymorphism!
3224
3225   Locks: sched_mutex is held upon entry and exit.
3226
3227 */
3228 static void
3229 unblockThread(StgTSO *tso)
3230 {
3231   StgBlockingQueueElement *t, **last;
3232
3233   switch (tso->why_blocked) {
3234
3235   case NotBlocked:
3236     return;  /* not blocked */
3237
3238   case BlockedOnSTM:
3239     // Be careful: nothing to do here!  We tell the scheduler that the thread
3240     // is runnable and we leave it to the stack-walking code to abort the 
3241     // transaction while unwinding the stack.  We should perhaps have a debugging
3242     // test to make sure that this really happens and that the 'zombie' transaction
3243     // does not get committed.
3244     goto done;
3245
3246   case BlockedOnMVar:
3247     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3248     {
3249       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3250       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3251
3252       last = (StgBlockingQueueElement **)&mvar->head;
3253       for (t = (StgBlockingQueueElement *)mvar->head; 
3254            t != END_BQ_QUEUE; 
3255            last = &t->link, last_tso = t, t = t->link) {
3256         if (t == (StgBlockingQueueElement *)tso) {
3257           *last = (StgBlockingQueueElement *)tso->link;
3258           if (mvar->tail == tso) {
3259             mvar->tail = (StgTSO *)last_tso;
3260           }
3261           goto done;
3262         }
3263       }
3264       barf("unblockThread (MVAR): TSO not found");
3265     }
3266
3267   case BlockedOnBlackHole:
3268     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3269     {
3270       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3271
3272       last = &bq->blocking_queue;
3273       for (t = bq->blocking_queue; 
3274            t != END_BQ_QUEUE; 
3275            last = &t->link, t = t->link) {
3276         if (t == (StgBlockingQueueElement *)tso) {
3277           *last = (StgBlockingQueueElement *)tso->link;
3278           goto done;
3279         }
3280       }
3281       barf("unblockThread (BLACKHOLE): TSO not found");
3282     }
3283
3284   case BlockedOnException:
3285     {
3286       StgTSO *target  = tso->block_info.tso;
3287
3288       ASSERT(get_itbl(target)->type == TSO);
3289
3290       if (target->what_next == ThreadRelocated) {
3291           target = target->link;
3292           ASSERT(get_itbl(target)->type == TSO);
3293       }
3294
3295       ASSERT(target->blocked_exceptions != NULL);
3296
3297       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3298       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3299            t != END_BQ_QUEUE; 
3300            last = &t->link, t = t->link) {
3301         ASSERT(get_itbl(t)->type == TSO);
3302         if (t == (StgBlockingQueueElement *)tso) {
3303           *last = (StgBlockingQueueElement *)tso->link;
3304           goto done;
3305         }
3306       }
3307       barf("unblockThread (Exception): TSO not found");
3308     }
3309
3310   case BlockedOnRead:
3311   case BlockedOnWrite:
3312 #if defined(mingw32_HOST_OS)
3313   case BlockedOnDoProc:
3314 #endif
3315     {
3316       /* take TSO off blocked_queue */
3317       StgBlockingQueueElement *prev = NULL;
3318       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3319            prev = t, t = t->link) {
3320         if (t == (StgBlockingQueueElement *)tso) {
3321           if (prev == NULL) {
3322             blocked_queue_hd = (StgTSO *)t->link;
3323             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3324               blocked_queue_tl = END_TSO_QUEUE;
3325             }
3326           } else {
3327             prev->link = t->link;
3328             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3329               blocked_queue_tl = (StgTSO *)prev;
3330             }
3331           }
3332           goto done;
3333         }
3334       }
3335       barf("unblockThread (I/O): TSO not found");
3336     }
3337
3338   case BlockedOnDelay:
3339     {
3340       /* take TSO off sleeping_queue */
3341       StgBlockingQueueElement *prev = NULL;
3342       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3343            prev = t, t = t->link) {
3344         if (t == (StgBlockingQueueElement *)tso) {
3345           if (prev == NULL) {
3346             sleeping_queue = (StgTSO *)t->link;
3347           } else {
3348             prev->link = t->link;
3349           }
3350           goto done;
3351         }
3352       }
3353       barf("unblockThread (delay): TSO not found");
3354     }
3355
3356   default:
3357     barf("unblockThread");
3358   }
3359
3360  done:
3361   tso->link = END_TSO_QUEUE;
3362   tso->why_blocked = NotBlocked;
3363   tso->block_info.closure = NULL;
3364   PUSH_ON_RUN_QUEUE(tso);
3365 }
3366 #else
3367 static void
3368 unblockThread(StgTSO *tso)
3369 {
3370   StgTSO *t, **last;
3371   
3372   /* To avoid locking unnecessarily. */
3373   if (tso->why_blocked == NotBlocked) {
3374     return;
3375   }
3376
3377   switch (tso->why_blocked) {
3378
3379   case BlockedOnSTM:
3380     // Be careful: nothing to do here!  We tell the scheduler that the thread
3381     // is runnable and we leave it to the stack-walking code to abort the 
3382     // transaction while unwinding the stack.  We should perhaps have a debugging
3383     // test to make sure that this really happens and that the 'zombie' transaction
3384     // does not get committed.
3385     goto done;
3386
3387   case BlockedOnMVar:
3388     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3389     {
3390       StgTSO *last_tso = END_TSO_QUEUE;
3391       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3392
3393       last = &mvar->head;
3394       for (t = mvar->head; t != END_TSO_QUEUE; 
3395            last = &t->link, last_tso = t, t = t->link) {
3396         if (t == tso) {
3397           *last = tso->link;
3398           if (mvar->tail == tso) {
3399             mvar->tail = last_tso;
3400           }
3401           goto done;
3402         }
3403       }
3404       barf("unblockThread (MVAR): TSO not found");
3405     }
3406
3407   case BlockedOnBlackHole:
3408     {
3409       last = &blackhole_queue;
3410       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3411            last = &t->link, t = t->link) {
3412         if (t == tso) {
3413           *last = tso->link;
3414           goto done;
3415         }
3416       }
3417       barf("unblockThread (BLACKHOLE): TSO not found");
3418     }
3419
3420   case BlockedOnException:
3421     {
3422       StgTSO *target  = tso->block_info.tso;
3423
3424       ASSERT(get_itbl(target)->type == TSO);
3425
3426       while (target->what_next == ThreadRelocated) {
3427           target = target->link;
3428           ASSERT(get_itbl(target)->type == TSO);
3429       }
3430       
3431       ASSERT(target->blocked_exceptions != NULL);
3432
3433       last = &target->blocked_exceptions;
3434       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3435            last = &t->link, t = t->link) {
3436         ASSERT(get_itbl(t)->type == TSO);
3437         if (t == tso) {
3438           *last = tso->link;
3439           goto done;
3440         }
3441       }
3442       barf("unblockThread (Exception): TSO not found");
3443     }
3444
3445   case BlockedOnRead:
3446   case BlockedOnWrite:
3447 #if defined(mingw32_HOST_OS)
3448   case BlockedOnDoProc:
3449 #endif
3450     {
3451       StgTSO *prev = NULL;
3452       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3453            prev = t, t = t->link) {
3454         if (t == tso) {
3455           if (prev == NULL) {
3456             blocked_queue_hd = t->link;
3457             if (blocked_queue_tl == t) {
3458               blocked_queue_tl = END_TSO_QUEUE;
3459             }
3460           } else {
3461             prev->link = t->link;
3462             if (blocked_queue_tl == t) {
3463               blocked_queue_tl = prev;
3464             }
3465           }
3466           goto done;
3467         }
3468       }
3469       barf("unblockThread (I/O): TSO not found");
3470     }
3471
3472   case BlockedOnDelay:
3473     {
3474       StgTSO *prev = NULL;
3475       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3476            prev = t, t = t->link) {
3477         if (t == tso) {
3478           if (prev == NULL) {
3479             sleeping_queue = t->link;
3480           } else {
3481             prev->link = t->link;
3482           }
3483           goto done;
3484         }
3485       }
3486       barf("unblockThread (delay): TSO not found");
3487     }
3488
3489   default:
3490     barf("unblockThread");
3491   }
3492
3493  done:
3494   tso->link = END_TSO_QUEUE;
3495   tso->why_blocked = NotBlocked;
3496   tso->block_info.closure = NULL;
3497   APPEND_TO_RUN_QUEUE(tso);
3498 }
3499 #endif
3500
3501 /* -----------------------------------------------------------------------------
3502  * checkBlackHoles()
3503  *
3504  * Check the blackhole_queue for threads that can be woken up.  We do
3505  * this periodically: before every GC, and whenever the run queue is
3506  * empty.
3507  *
3508  * An elegant solution might be to just wake up all the blocked
3509  * threads with awakenBlockedQueue occasionally: they'll go back to
3510  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3511  * doesn't give us a way to tell whether we've actually managed to
3512  * wake up any threads, so we would be busy-waiting.
3513  *
3514  * -------------------------------------------------------------------------- */
3515
3516 static rtsBool
3517 checkBlackHoles( void )
3518 {
3519     StgTSO **prev, *t;
3520     rtsBool any_woke_up = rtsFalse;
3521     StgHalfWord type;
3522
3523     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3524
3525     // ASSUMES: sched_mutex
3526     prev = &blackhole_queue;
3527     t = blackhole_queue;
3528     while (t != END_TSO_QUEUE) {
3529         ASSERT(t->why_blocked == BlockedOnBlackHole);
3530         type = get_itbl(t->block_info.closure)->type;
3531         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3532             t = unblockOneLocked(t);
3533             *prev = t;
3534             any_woke_up = rtsTrue;
3535         } else {
3536             prev = &t->link;
3537             t = t->link;
3538         }
3539     }
3540
3541     return any_woke_up;
3542 }
3543
3544 /* -----------------------------------------------------------------------------
3545  * raiseAsync()
3546  *
3547  * The following function implements the magic for raising an
3548  * asynchronous exception in an existing thread.
3549  *
3550  * We first remove the thread from any queue on which it might be
3551  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3552  *
3553  * We strip the stack down to the innermost CATCH_FRAME, building
3554  * thunks in the heap for all the active computations, so they can 
3555  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3556  * an application of the handler to the exception, and push it on
3557  * the top of the stack.
3558  * 
3559  * How exactly do we save all the active computations?  We create an
3560  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3561  * AP_STACKs pushes everything from the corresponding update frame
3562  * upwards onto the stack.  (Actually, it pushes everything up to the
3563  * next update frame plus a pointer to the next AP_STACK object.
3564  * Entering the next AP_STACK object pushes more onto the stack until we
3565  * reach the last AP_STACK object - at which point the stack should look
3566  * exactly as it did when we killed the TSO and we can continue
3567  * execution by entering the closure on top of the stack.
3568  *
3569  * We can also kill a thread entirely - this happens if either (a) the 
3570  * exception passed to raiseAsync is NULL, or (b) there's no
3571  * CATCH_FRAME on the stack.  In either case, we strip the entire
3572  * stack and replace the thread with a zombie.
3573  *
3574  * Locks: sched_mutex held upon entry nor exit.
3575  *
3576  * -------------------------------------------------------------------------- */
3577  
3578 void 
3579 deleteThread(StgTSO *tso)
3580 {
3581   if (tso->why_blocked != BlockedOnCCall &&
3582       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3583       raiseAsync(tso,NULL);
3584   }
3585 }
3586
3587 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3588 static void 
3589 deleteThreadImmediately(StgTSO *tso)
3590 { // for forkProcess only:
3591   // delete thread without giving it a chance to catch the KillThread exception
3592
3593   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3594       return;
3595   }
3596
3597   if (tso->why_blocked != BlockedOnCCall &&
3598       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3599     unblockThread(tso);
3600   }
3601
3602   tso->what_next = ThreadKilled;
3603 }
3604 #endif
3605
3606 void
3607 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3608 {
3609   /* When raising async exs from contexts where sched_mutex isn't held;
3610      use raiseAsyncWithLock(). */
3611   ACQUIRE_LOCK(&sched_mutex);
3612   raiseAsync(tso,exception);
3613   RELEASE_LOCK(&sched_mutex);
3614 }
3615
3616 void
3617 raiseAsync(StgTSO *tso, StgClosure *exception)
3618 {
3619     raiseAsync_(tso, exception, rtsFalse);
3620 }
3621
3622 static void
3623 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3624 {
3625     StgRetInfoTable *info;
3626     StgPtr sp;
3627   
3628     // Thread already dead?
3629     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3630         return;
3631     }
3632
3633     IF_DEBUG(scheduler, 
3634              sched_belch("raising exception in thread %ld.", (long)tso->id));
3635     
3636     // Remove it from any blocking queues
3637     unblockThread(tso);
3638
3639     sp = tso->sp;
3640     
3641     // The stack freezing code assumes there's a closure pointer on
3642     // the top of the stack, so we have to arrange that this is the case...
3643     //
3644     if (sp[0] == (W_)&stg_enter_info) {
3645         sp++;
3646     } else {
3647         sp--;
3648         sp[0] = (W_)&stg_dummy_ret_closure;
3649     }
3650
3651     while (1) {
3652         nat i;
3653
3654         // 1. Let the top of the stack be the "current closure"
3655         //
3656         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3657         // CATCH_FRAME.
3658         //
3659         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3660         // current closure applied to the chunk of stack up to (but not
3661         // including) the update frame.  This closure becomes the "current
3662         // closure".  Go back to step 2.
3663         //
3664         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3665         // top of the stack applied to the exception.
3666         // 
3667         // 5. If it's a STOP_FRAME, then kill the thread.
3668         // 
3669         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3670         // transaction
3671        
3672         
3673         StgPtr frame;
3674         
3675         frame = sp + 1;
3676         info = get_ret_itbl((StgClosure *)frame);
3677         
3678         while (info->i.type != UPDATE_FRAME
3679                && (info->i.type != CATCH_FRAME || exception == NULL)
3680                && info->i.type != STOP_FRAME
3681                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3682         {
3683             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3684               // IF we find an ATOMICALLY_FRAME then we abort the
3685               // current transaction and propagate the exception.  In
3686               // this case (unlike ordinary exceptions) we do not care
3687               // whether the transaction is valid or not because its
3688               // possible validity cannot have caused the exception
3689               // and will not be visible after the abort.
3690               IF_DEBUG(stm,
3691                        debugBelch("Found atomically block delivering async exception\n"));
3692               stmAbortTransaction(tso -> trec);
3693               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3694             }
3695             frame += stack_frame_sizeW((StgClosure *)frame);
3696             info = get_ret_itbl((StgClosure *)frame);
3697         }
3698         
3699         switch (info->i.type) {
3700             
3701         case ATOMICALLY_FRAME:
3702             ASSERT(stop_at_atomically);
3703             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3704             stmCondemnTransaction(tso -> trec);
3705 #ifdef REG_R1
3706             tso->sp = frame;
3707 #else
3708             // R1 is not a register: the return convention for IO in
3709             // this case puts the return value on the stack, so we
3710             // need to set up the stack to return to the atomically
3711             // frame properly...
3712             tso->sp = frame - 2;
3713             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3714             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3715 #endif
3716             tso->what_next = ThreadRunGHC;
3717             return;
3718
3719         case CATCH_FRAME:
3720             // If we find a CATCH_FRAME, and we've got an exception to raise,
3721             // then build the THUNK raise(exception), and leave it on
3722             // top of the CATCH_FRAME ready to enter.
3723             //
3724         {
3725 #ifdef PROFILING
3726             StgCatchFrame *cf = (StgCatchFrame *)frame;
3727 #endif
3728             StgClosure *raise;
3729             
3730             // we've got an exception to raise, so let's pass it to the
3731             // handler in this frame.
3732             //
3733             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3734             TICK_ALLOC_SE_THK(1,0);
3735             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3736             raise->payload[0] = exception;
3737             
3738             // throw away the stack from Sp up to the CATCH_FRAME.
3739             //
3740             sp = frame - 1;
3741             
3742             /* Ensure that async excpetions are blocked now, so we don't get
3743              * a surprise exception before we get around to executing the
3744              * handler.
3745              */
3746             if (tso->blocked_exceptions == NULL) {
3747                 tso->blocked_exceptions = END_TSO_QUEUE;
3748             }
3749             
3750             /* Put the newly-built THUNK on top of the stack, ready to execute
3751              * when the thread restarts.
3752              */
3753             sp[0] = (W_)raise;
3754             sp[-1] = (W_)&stg_enter_info;
3755             tso->sp = sp-1;
3756             tso->what_next = ThreadRunGHC;
3757             IF_DEBUG(sanity, checkTSO(tso));
3758             return;
3759         }
3760         
3761         case UPDATE_FRAME:
3762         {
3763             StgAP_STACK * ap;
3764             nat words;
3765             
3766             // First build an AP_STACK consisting of the stack chunk above the
3767             // current update frame, with the top word on the stack as the
3768             // fun field.
3769             //
3770             words = frame - sp - 1;
3771             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3772             
3773             ap->size = words;
3774             ap->fun  = (StgClosure *)sp[0];
3775             sp++;
3776             for(i=0; i < (nat)words; ++i) {
3777                 ap->payload[i] = (StgClosure *)*sp++;
3778             }
3779             
3780             SET_HDR(ap,&stg_AP_STACK_info,
3781                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3782             TICK_ALLOC_UP_THK(words+1,0);
3783             
3784             IF_DEBUG(scheduler,
3785                      debugBelch("sched: Updating ");
3786                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3787                      debugBelch(" with ");
3788                      printObj((StgClosure *)ap);
3789                 );
3790
3791             // Replace the updatee with an indirection - happily
3792             // this will also wake up any threads currently
3793             // waiting on the result.
3794             //
3795             // Warning: if we're in a loop, more than one update frame on
3796             // the stack may point to the same object.  Be careful not to
3797             // overwrite an IND_OLDGEN in this case, because we'll screw
3798             // up the mutable lists.  To be on the safe side, don't
3799             // overwrite any kind of indirection at all.  See also
3800             // threadSqueezeStack in GC.c, where we have to make a similar
3801             // check.
3802             //
3803             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3804                 // revert the black hole
3805                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3806                                (StgClosure *)ap);
3807             }
3808             sp += sizeofW(StgUpdateFrame) - 1;
3809             sp[0] = (W_)ap; // push onto stack
3810             break;
3811         }
3812         
3813         case STOP_FRAME:
3814             // We've stripped the entire stack, the thread is now dead.
3815             sp += sizeofW(StgStopFrame);
3816             tso->what_next = ThreadKilled;
3817             tso->sp = sp;
3818             return;
3819             
3820         default:
3821             barf("raiseAsync");
3822         }
3823     }
3824     barf("raiseAsync");
3825 }
3826
3827 /* -----------------------------------------------------------------------------
3828    raiseExceptionHelper
3829    
3830    This function is called by the raise# primitve, just so that we can
3831    move some of the tricky bits of raising an exception from C-- into
3832    C.  Who knows, it might be a useful re-useable thing here too.
3833    -------------------------------------------------------------------------- */
3834
3835 StgWord
3836 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3837 {
3838     StgClosure *raise_closure = NULL;
3839     StgPtr p, next;
3840     StgRetInfoTable *info;
3841     //
3842     // This closure represents the expression 'raise# E' where E
3843     // is the exception raise.  It is used to overwrite all the
3844     // thunks which are currently under evaluataion.
3845     //
3846
3847     //    
3848     // LDV profiling: stg_raise_info has THUNK as its closure
3849     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3850     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3851     // 1 does not cause any problem unless profiling is performed.
3852     // However, when LDV profiling goes on, we need to linearly scan
3853     // small object pool, where raise_closure is stored, so we should
3854     // use MIN_UPD_SIZE.
3855     //
3856     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3857     //                                 sizeofW(StgClosure)+1);
3858     //
3859
3860     //
3861     // Walk up the stack, looking for the catch frame.  On the way,
3862     // we update any closures pointed to from update frames with the
3863     // raise closure that we just built.
3864     //
3865     p = tso->sp;
3866     while(1) {
3867         info = get_ret_itbl((StgClosure *)p);
3868         next = p + stack_frame_sizeW((StgClosure *)p);
3869         switch (info->i.type) {
3870             
3871         case UPDATE_FRAME:
3872             // Only create raise_closure if we need to.
3873             if (raise_closure == NULL) {
3874                 raise_closure = 
3875                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3876                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3877                 raise_closure->payload[0] = exception;
3878             }
3879             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3880             p = next;
3881             continue;
3882
3883         case ATOMICALLY_FRAME:
3884             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3885             tso->sp = p;
3886             return ATOMICALLY_FRAME;
3887             
3888         case CATCH_FRAME:
3889             tso->sp = p;
3890             return CATCH_FRAME;
3891
3892         case CATCH_STM_FRAME:
3893             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3894             tso->sp = p;
3895             return CATCH_STM_FRAME;
3896             
3897         case STOP_FRAME:
3898             tso->sp = p;
3899             return STOP_FRAME;
3900
3901         case CATCH_RETRY_FRAME:
3902         default:
3903             p = next; 
3904             continue;
3905         }
3906     }
3907 }
3908
3909
3910 /* -----------------------------------------------------------------------------
3911    findRetryFrameHelper
3912
3913    This function is called by the retry# primitive.  It traverses the stack
3914    leaving tso->sp referring to the frame which should handle the retry.  
3915
3916    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3917    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3918
3919    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3920    despite the similar implementation.
3921
3922    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3923    not be created within memory transactions.
3924    -------------------------------------------------------------------------- */
3925
3926 StgWord
3927 findRetryFrameHelper (StgTSO *tso)
3928 {
3929   StgPtr           p, next;
3930   StgRetInfoTable *info;
3931
3932   p = tso -> sp;
3933   while (1) {
3934     info = get_ret_itbl((StgClosure *)p);
3935     next = p + stack_frame_sizeW((StgClosure *)p);
3936     switch (info->i.type) {
3937       
3938     case ATOMICALLY_FRAME:
3939       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3940       tso->sp = p;
3941       return ATOMICALLY_FRAME;
3942       
3943     case CATCH_RETRY_FRAME:
3944       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3945       tso->sp = p;
3946       return CATCH_RETRY_FRAME;
3947       
3948     case CATCH_STM_FRAME:
3949     default:
3950       ASSERT(info->i.type != CATCH_FRAME);
3951       ASSERT(info->i.type != STOP_FRAME);
3952       p = next; 
3953       continue;
3954     }
3955   }
3956 }
3957
3958 /* -----------------------------------------------------------------------------
3959    resurrectThreads is called after garbage collection on the list of
3960    threads found to be garbage.  Each of these threads will be woken
3961    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3962    on an MVar, or NonTermination if the thread was blocked on a Black
3963    Hole.
3964
3965    Locks: sched_mutex isn't held upon entry nor exit.
3966    -------------------------------------------------------------------------- */
3967
3968 void
3969 resurrectThreads( StgTSO *threads )
3970 {
3971   StgTSO *tso, *next;
3972
3973   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3974     next = tso->global_link;
3975     tso->global_link = all_threads;
3976     all_threads = tso;
3977     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3978
3979     switch (tso->why_blocked) {
3980     case BlockedOnMVar:
3981     case BlockedOnException:
3982       /* Called by GC - sched_mutex lock is currently held. */
3983       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3984       break;
3985     case BlockedOnBlackHole:
3986       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3987       break;
3988     case BlockedOnSTM:
3989       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3990       break;
3991     case NotBlocked:
3992       /* This might happen if the thread was blocked on a black hole
3993        * belonging to a thread that we've just woken up (raiseAsync
3994        * can wake up threads, remember...).
3995        */
3996       continue;
3997     default:
3998       barf("resurrectThreads: thread blocked in a strange way");
3999     }
4000   }
4001 }
4002
4003 /* ----------------------------------------------------------------------------
4004  * Debugging: why is a thread blocked
4005  * [Also provides useful information when debugging threaded programs
4006  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4007    ------------------------------------------------------------------------- */
4008
4009 static void
4010 printThreadBlockage(StgTSO *tso)
4011 {
4012   switch (tso->why_blocked) {
4013   case BlockedOnRead:
4014     debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
4015     break;
4016   case BlockedOnWrite:
4017     debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
4018     break;
4019 #if defined(mingw32_HOST_OS)
4020     case BlockedOnDoProc:
4021     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4022     break;
4023 #endif
4024   case BlockedOnDelay:
4025     debugBelch("is blocked until %ld", tso->block_info.target);
4026     break;
4027   case BlockedOnMVar:
4028     debugBelch("is blocked on an MVar");
4029     break;
4030   case BlockedOnException:
4031     debugBelch("is blocked on delivering an exception to thread %d",
4032             tso->block_info.tso->id);
4033     break;
4034   case BlockedOnBlackHole:
4035     debugBelch("is blocked on a black hole");
4036     break;
4037   case NotBlocked:
4038     debugBelch("is not blocked");
4039     break;
4040 #if defined(PARALLEL_HASKELL)
4041   case BlockedOnGA:
4042     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4043             tso->block_info.closure, info_type(tso->block_info.closure));
4044     break;
4045   case BlockedOnGA_NoSend:
4046     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4047             tso->block_info.closure, info_type(tso->block_info.closure));
4048     break;
4049 #endif
4050   case BlockedOnCCall:
4051     debugBelch("is blocked on an external call");
4052     break;
4053   case BlockedOnCCall_NoUnblockExc:
4054     debugBelch("is blocked on an external call (exceptions were already blocked)");
4055     break;
4056   case BlockedOnSTM:
4057     debugBelch("is blocked on an STM operation");
4058     break;
4059   default:
4060     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4061          tso->why_blocked, tso->id, tso);
4062   }
4063 }
4064
4065 static void
4066 printThreadStatus(StgTSO *tso)
4067 {
4068   switch (tso->what_next) {
4069   case ThreadKilled:
4070     debugBelch("has been killed");
4071     break;
4072   case ThreadComplete:
4073     debugBelch("has completed");
4074     break;
4075   default:
4076     printThreadBlockage(tso);
4077   }
4078 }
4079
4080 void
4081 printAllThreads(void)
4082 {
4083   StgTSO *t;
4084
4085 # if defined(GRAN)
4086   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4087   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4088                        time_string, rtsFalse/*no commas!*/);
4089
4090   debugBelch("all threads at [%s]:\n", time_string);
4091 # elif defined(PARALLEL_HASKELL)
4092   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4093   ullong_format_string(CURRENT_TIME,
4094                        time_string, rtsFalse/*no commas!*/);
4095
4096   debugBelch("all threads at [%s]:\n", time_string);
4097 # else
4098   debugBelch("all threads:\n");
4099 # endif
4100
4101   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
4102     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
4103 #if defined(DEBUG)
4104     {
4105       void *label = lookupThreadLabel(t->id);
4106       if (label) debugBelch("[\"%s\"] ",(char *)label);
4107     }
4108 #endif
4109     printThreadStatus(t);
4110     debugBelch("\n");
4111   }
4112 }
4113     
4114 #ifdef DEBUG
4115
4116 /* 
4117    Print a whole blocking queue attached to node (debugging only).
4118 */
4119 # if defined(PARALLEL_HASKELL)
4120 void 
4121 print_bq (StgClosure *node)
4122 {
4123   StgBlockingQueueElement *bqe;
4124   StgTSO *tso;
4125   rtsBool end;
4126
4127   debugBelch("## BQ of closure %p (%s): ",
4128           node, info_type(node));
4129
4130   /* should cover all closures that may have a blocking queue */
4131   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4132          get_itbl(node)->type == FETCH_ME_BQ ||
4133          get_itbl(node)->type == RBH ||
4134          get_itbl(node)->type == MVAR);
4135     
4136   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4137
4138   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4139 }
4140
4141 /* 
4142    Print a whole blocking queue starting with the element bqe.
4143 */
4144 void 
4145 print_bqe (StgBlockingQueueElement *bqe)
4146 {
4147   rtsBool end;
4148
4149   /* 
4150      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4151   */
4152   for (end = (bqe==END_BQ_QUEUE);
4153        !end; // iterate until bqe points to a CONSTR
4154        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4155        bqe = end ? END_BQ_QUEUE : bqe->link) {
4156     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4157     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4158     /* types of closures that may appear in a blocking queue */
4159     ASSERT(get_itbl(bqe)->type == TSO ||           
4160            get_itbl(bqe)->type == BLOCKED_FETCH || 
4161            get_itbl(bqe)->type == CONSTR); 
4162     /* only BQs of an RBH end with an RBH_Save closure */
4163     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4164
4165     switch (get_itbl(bqe)->type) {
4166     case TSO:
4167       debugBelch(" TSO %u (%x),",
4168               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4169       break;
4170     case BLOCKED_FETCH:
4171       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4172               ((StgBlockedFetch *)bqe)->node, 
4173               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4174               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4175               ((StgBlockedFetch *)bqe)->ga.weight);
4176       break;
4177     case CONSTR:
4178       debugBelch(" %s (IP %p),",
4179               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4180                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4181                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4182                "RBH_Save_?"), get_itbl(bqe));
4183       break;
4184     default:
4185       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4186            info_type((StgClosure *)bqe)); // , node, info_type(node));
4187       break;
4188     }
4189   } /* for */
4190   debugBelch("\n");
4191 }
4192 # elif defined(GRAN)
4193 void 
4194 print_bq (StgClosure *node)
4195 {
4196   StgBlockingQueueElement *bqe;
4197   PEs node_loc, tso_loc;
4198   rtsBool end;
4199
4200   /* should cover all closures that may have a blocking queue */
4201   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4202          get_itbl(node)->type == FETCH_ME_BQ ||
4203          get_itbl(node)->type == RBH);
4204     
4205   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4206   node_loc = where_is(node);
4207
4208   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4209           node, info_type(node), node_loc);
4210
4211   /* 
4212      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4213   */
4214   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4215        !end; // iterate until bqe points to a CONSTR
4216        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4217     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4218     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4219     /* types of closures that may appear in a blocking queue */
4220     ASSERT(get_itbl(bqe)->type == TSO ||           
4221            get_itbl(bqe)->type == CONSTR); 
4222     /* only BQs of an RBH end with an RBH_Save closure */
4223     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4224
4225     tso_loc = where_is((StgClosure *)bqe);
4226     switch (get_itbl(bqe)->type) {
4227     case TSO:
4228       debugBelch(" TSO %d (%p) on [PE %d],",
4229               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4230       break;
4231     case CONSTR:
4232       debugBelch(" %s (IP %p),",
4233               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4234                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4235                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4236                "RBH_Save_?"), get_itbl(bqe));
4237       break;
4238     default:
4239       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4240            info_type((StgClosure *)bqe), node, info_type(node));
4241       break;
4242     }
4243   } /* for */
4244   debugBelch("\n");
4245 }
4246 # endif
4247
4248 #if defined(PARALLEL_HASKELL)
4249 static nat
4250 run_queue_len(void)
4251 {
4252   nat i;
4253   StgTSO *tso;
4254
4255   for (i=0, tso=run_queue_hd; 
4256        tso != END_TSO_QUEUE;
4257        i++, tso=tso->link)
4258     /* nothing */
4259
4260   return i;
4261 }
4262 #endif
4263
4264 void
4265 sched_belch(char *s, ...)
4266 {
4267   va_list ap;
4268   va_start(ap,s);
4269 #ifdef RTS_SUPPORTS_THREADS
4270   debugBelch("sched (task %p): ", osThreadId());
4271 #elif defined(PARALLEL_HASKELL)
4272   debugBelch("== ");
4273 #else
4274   debugBelch("sched: ");
4275 #endif
4276   vdebugBelch(s, ap);
4277   debugBelch("\n");
4278   va_end(ap);
4279 }
4280
4281 #endif /* DEBUG */