[project @ 2005-06-28 13:44:28 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PARALLEL_HASKELL          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "OSThreads.h"
46 #include "Storage.h"
47 #include "StgRun.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PARALLEL_HASKELL)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 // Turn off inlining when debugging - it obfuscates things
97 #ifdef DEBUG
98 # undef  STATIC_INLINE
99 # define STATIC_INLINE static
100 #endif
101
102 #ifdef THREADED_RTS
103 #define USED_IN_THREADED_RTS
104 #else
105 #define USED_IN_THREADED_RTS STG_UNUSED
106 #endif
107
108 #ifdef RTS_SUPPORTS_THREADS
109 #define USED_WHEN_RTS_SUPPORTS_THREADS
110 #else
111 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
112 #endif
113
114 /* Main thread queue.
115  * Locks required: sched_mutex.
116  */
117 StgMainThread *main_threads = NULL;
118
119 #if defined(GRAN)
120
121 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
122 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
123
124 /* 
125    In GranSim we have a runnable and a blocked queue for each processor.
126    In order to minimise code changes new arrays run_queue_hds/tls
127    are created. run_queue_hd is then a short cut (macro) for
128    run_queue_hds[CurrentProc] (see GranSim.h).
129    -- HWL
130 */
131 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
132 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
133 StgTSO *ccalling_threadss[MAX_PROC];
134 /* We use the same global list of threads (all_threads) in GranSim as in
135    the std RTS (i.e. we are cheating). However, we don't use this list in
136    the GranSim specific code at the moment (so we are only potentially
137    cheating).  */
138
139 #else /* !GRAN */
140
141 /* Thread queues.
142  * Locks required: sched_mutex.
143  */
144 StgTSO *run_queue_hd = NULL;
145 StgTSO *run_queue_tl = NULL;
146 StgTSO *blocked_queue_hd = NULL;
147 StgTSO *blocked_queue_tl = NULL;
148 StgTSO *blackhole_queue = NULL;
149 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
150
151 #endif
152
153 /* The blackhole_queue should be checked for threads to wake up.  See
154  * Schedule.h for more thorough comment.
155  */
156 rtsBool blackholes_need_checking = rtsFalse;
157
158 /* Linked list of all threads.
159  * Used for detecting garbage collected threads.
160  */
161 StgTSO *all_threads = NULL;
162
163 /* When a thread performs a safe C call (_ccall_GC, using old
164  * terminology), it gets put on the suspended_ccalling_threads
165  * list. Used by the garbage collector.
166  */
167 static StgTSO *suspended_ccalling_threads;
168
169 /* KH: The following two flags are shared memory locations.  There is no need
170        to lock them, since they are only unset at the end of a scheduler
171        operation.
172 */
173
174 /* flag set by signal handler to precipitate a context switch */
175 int context_switch = 0;
176
177 /* flag that tracks whether we have done any execution in this time slice. */
178 nat recent_activity = ACTIVITY_YES;
179
180 /* if this flag is set as well, give up execution */
181 rtsBool interrupted = rtsFalse;
182
183 /* Next thread ID to allocate.
184  * Locks required: thread_id_mutex
185  */
186 static StgThreadID next_thread_id = 1;
187
188 /*
189  * Pointers to the state of the current thread.
190  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
191  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
192  */
193  
194 /* The smallest stack size that makes any sense is:
195  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
196  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
197  *  + 1                       (the closure to enter)
198  *  + 1                       (stg_ap_v_ret)
199  *  + 1                       (spare slot req'd by stg_ap_v_ret)
200  *
201  * A thread with this stack will bomb immediately with a stack
202  * overflow, which will increase its stack size.  
203  */
204
205 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
206
207
208 #if defined(GRAN)
209 StgTSO *CurrentTSO;
210 #endif
211
212 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
213  *  exists - earlier gccs apparently didn't.
214  *  -= chak
215  */
216 StgTSO dummy_tso;
217
218 /*
219  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
220  * in an MT setting, needed to signal that a worker thread shouldn't hang around
221  * in the scheduler when it is out of work.
222  */
223 static rtsBool shutting_down_scheduler = rtsFalse;
224
225 #if defined(RTS_SUPPORTS_THREADS)
226 /* ToDo: carefully document the invariants that go together
227  *       with these synchronisation objects.
228  */
229 Mutex     sched_mutex       = INIT_MUTEX_VAR;
230 Mutex     term_mutex        = INIT_MUTEX_VAR;
231
232 #endif /* RTS_SUPPORTS_THREADS */
233
234 #if defined(PARALLEL_HASKELL)
235 StgTSO *LastTSO;
236 rtsTime TimeOfLastYield;
237 rtsBool emitSchedule = rtsTrue;
238 #endif
239
240 #if DEBUG
241 static char *whatNext_strs[] = {
242   "(unknown)",
243   "ThreadRunGHC",
244   "ThreadInterpret",
245   "ThreadKilled",
246   "ThreadRelocated",
247   "ThreadComplete"
248 };
249 #endif
250
251 /* -----------------------------------------------------------------------------
252  * static function prototypes
253  * -------------------------------------------------------------------------- */
254
255 #if defined(RTS_SUPPORTS_THREADS)
256 static void taskStart(void);
257 #endif
258
259 static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
260                       Capability *initialCapability );
261
262 //
263 // These function all encapsulate parts of the scheduler loop, and are
264 // abstracted only to make the structure and control flow of the
265 // scheduler clearer.
266 //
267 static void schedulePreLoop(void);
268 static void scheduleStartSignalHandlers(void);
269 static void scheduleCheckBlockedThreads(void);
270 static void scheduleCheckBlackHoles(void);
271 static void scheduleDetectDeadlock(void);
272 #if defined(GRAN)
273 static StgTSO *scheduleProcessEvent(rtsEvent *event);
274 #endif
275 #if defined(PARALLEL_HASKELL)
276 static StgTSO *scheduleSendPendingMessages(void);
277 static void scheduleActivateSpark(void);
278 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
279 #endif
280 #if defined(PAR) || defined(GRAN)
281 static void scheduleGranParReport(void);
282 #endif
283 static void schedulePostRunThread(void);
284 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
285 static void scheduleHandleStackOverflow( StgTSO *t);
286 static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
287 static void scheduleHandleThreadBlocked( StgTSO *t );
288 static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, 
289                                              Capability *cap, StgTSO *t );
290 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
291 static void scheduleDoGC(rtsBool force_major);
292
293 static void unblockThread(StgTSO *tso);
294 static rtsBool checkBlackHoles(void);
295 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
296                                    Capability *initialCapability
297                                    );
298 static void scheduleThread_ (StgTSO* tso);
299 static void AllRoots(evac_fn evac);
300
301 static StgTSO *threadStackOverflow(StgTSO *tso);
302
303 static void raiseAsync_(StgTSO *tso, StgClosure *exception, 
304                         rtsBool stop_at_atomically);
305
306 static void printThreadBlockage(StgTSO *tso);
307 static void printThreadStatus(StgTSO *tso);
308 void printThreadQueue(StgTSO *tso);
309
310 #if defined(PARALLEL_HASKELL)
311 StgTSO * createSparkThread(rtsSpark spark);
312 StgTSO * activateSpark (rtsSpark spark);  
313 #endif
314
315 /* ----------------------------------------------------------------------------
316  * Starting Tasks
317  * ------------------------------------------------------------------------- */
318
319 #if defined(RTS_SUPPORTS_THREADS)
320 static nat startingWorkerThread = 0;
321
322 static void
323 taskStart(void)
324 {
325   ACQUIRE_LOCK(&sched_mutex);
326   startingWorkerThread--;
327   schedule(NULL,NULL);
328   taskStop();
329   RELEASE_LOCK(&sched_mutex);
330 }
331
332 void
333 startSchedulerTaskIfNecessary(void)
334 {
335     if ( !EMPTY_RUN_QUEUE()
336          && !shutting_down_scheduler // not if we're shutting down
337          && startingWorkerThread==0)
338     {
339         // we don't want to start another worker thread
340         // just because the last one hasn't yet reached the
341         // "waiting for capability" state
342         startingWorkerThread++;
343         if (!maybeStartNewWorker(taskStart)) {
344             startingWorkerThread--;
345         }
346     }
347 }
348 #endif
349
350 /* -----------------------------------------------------------------------------
351  * Putting a thread on the run queue: different scheduling policies
352  * -------------------------------------------------------------------------- */
353
354 STATIC_INLINE void
355 addToRunQueue( StgTSO *t )
356 {
357 #if defined(PARALLEL_HASKELL)
358     if (RtsFlags.ParFlags.doFairScheduling) { 
359         // this does round-robin scheduling; good for concurrency
360         APPEND_TO_RUN_QUEUE(t);
361     } else {
362         // this does unfair scheduling; good for parallelism
363         PUSH_ON_RUN_QUEUE(t);
364     }
365 #else
366     // this does round-robin scheduling; good for concurrency
367     APPEND_TO_RUN_QUEUE(t);
368 #endif
369 }
370     
371 /* ---------------------------------------------------------------------------
372    Main scheduling loop.
373
374    We use round-robin scheduling, each thread returning to the
375    scheduler loop when one of these conditions is detected:
376
377       * out of heap space
378       * timer expires (thread yields)
379       * thread blocks
380       * thread ends
381       * stack overflow
382
383    Locking notes:  we acquire the scheduler lock once at the beginning
384    of the scheduler loop, and release it when
385     
386       * running a thread, or
387       * waiting for work, or
388       * waiting for a GC to complete.
389
390    GRAN version:
391      In a GranSim setup this loop iterates over the global event queue.
392      This revolves around the global event queue, which determines what 
393      to do next. Therefore, it's more complicated than either the 
394      concurrent or the parallel (GUM) setup.
395
396    GUM version:
397      GUM iterates over incoming messages.
398      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
399      and sends out a fish whenever it has nothing to do; in-between
400      doing the actual reductions (shared code below) it processes the
401      incoming messages and deals with delayed operations 
402      (see PendingFetches).
403      This is not the ugliest code you could imagine, but it's bloody close.
404
405    ------------------------------------------------------------------------ */
406
407 static void
408 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
409           Capability *initialCapability )
410 {
411   StgTSO *t;
412   Capability *cap;
413   StgThreadReturnCode ret;
414 #if defined(GRAN)
415   rtsEvent *event;
416 #elif defined(PARALLEL_HASKELL)
417   StgTSO *tso;
418   GlobalTaskId pe;
419   rtsBool receivedFinish = rtsFalse;
420 # if defined(DEBUG)
421   nat tp_size, sp_size; // stats only
422 # endif
423 #endif
424   nat prev_what_next;
425   rtsBool ready_to_gc;
426   
427   // Pre-condition: sched_mutex is held.
428   // We might have a capability, passed in as initialCapability.
429   cap = initialCapability;
430
431 #if !defined(RTS_SUPPORTS_THREADS)
432   // simply initialise it in the non-threaded case
433   grabCapability(&cap);
434 #endif
435
436   IF_DEBUG(scheduler,
437            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
438                        mainThread, initialCapability);
439       );
440
441   schedulePreLoop();
442
443   // -----------------------------------------------------------
444   // Scheduler loop starts here:
445
446 #if defined(PARALLEL_HASKELL)
447 #define TERMINATION_CONDITION        (!receivedFinish)
448 #elif defined(GRAN)
449 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
450 #else
451 #define TERMINATION_CONDITION        rtsTrue
452 #endif
453
454   while (TERMINATION_CONDITION) {
455
456 #if defined(GRAN)
457       /* Choose the processor with the next event */
458       CurrentProc = event->proc;
459       CurrentTSO = event->tso;
460 #endif
461
462 #if defined(RTS_SUPPORTS_THREADS)
463       // Yield the capability to higher-priority tasks if necessary.
464       //
465       if (cap != NULL) {
466           yieldCapability(&cap, 
467                           mainThread ? &mainThread->bound_thread_cond : NULL );
468       }
469
470       // If we do not currently hold a capability, we wait for one
471       //
472       if (cap == NULL) {
473           waitForCapability(&sched_mutex, &cap,
474                             mainThread ? &mainThread->bound_thread_cond : NULL);
475       }
476
477       // We now have a capability...
478 #endif
479
480 #if 0 /* extra sanity checking */
481       { 
482           StgMainThread *m;
483           for (m = main_threads; m != NULL; m = m->link) {
484               ASSERT(get_itbl(m->tso)->type == TSO);
485           }
486       }
487 #endif
488
489     // Check whether we have re-entered the RTS from Haskell without
490     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
491     // call).
492     if (cap->r.rInHaskell) {
493           errorBelch("schedule: re-entered unsafely.\n"
494                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
495           stg_exit(1);
496     }
497
498     //
499     // Test for interruption.  If interrupted==rtsTrue, then either
500     // we received a keyboard interrupt (^C), or the scheduler is
501     // trying to shut down all the tasks (shutting_down_scheduler) in
502     // the threaded RTS.
503     //
504     if (interrupted) {
505         if (shutting_down_scheduler) {
506             IF_DEBUG(scheduler, sched_belch("shutting down"));
507             releaseCapability(cap);
508             if (mainThread) {
509                 mainThread->stat = Interrupted;
510                 mainThread->ret  = NULL;
511             }
512             return;
513         } else {
514             IF_DEBUG(scheduler, sched_belch("interrupted"));
515             deleteAllThreads();
516         }
517     }
518
519 #if defined(not_yet) && defined(SMP)
520     //
521     // Top up the run queue from our spark pool.  We try to make the
522     // number of threads in the run queue equal to the number of
523     // free capabilities.
524     //
525     {
526         StgClosure *spark;
527         if (EMPTY_RUN_QUEUE()) {
528             spark = findSpark(rtsFalse);
529             if (spark == NULL) {
530                 break; /* no more sparks in the pool */
531             } else {
532                 createSparkThread(spark);         
533                 IF_DEBUG(scheduler,
534                          sched_belch("==^^ turning spark of closure %p into a thread",
535                                      (StgClosure *)spark));
536             }
537         }
538     }
539 #endif // SMP
540
541     scheduleStartSignalHandlers();
542
543     // Only check the black holes here if we've nothing else to do.
544     // During normal execution, the black hole list only gets checked
545     // at GC time, to avoid repeatedly traversing this possibly long
546     // list each time around the scheduler.
547     if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); }
548
549     scheduleCheckBlockedThreads();
550
551     scheduleDetectDeadlock();
552
553     // Normally, the only way we can get here with no threads to
554     // run is if a keyboard interrupt received during 
555     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
556     // Additionally, it is not fatal for the
557     // threaded RTS to reach here with no threads to run.
558     //
559     // win32: might be here due to awaitEvent() being abandoned
560     // as a result of a console event having been delivered.
561     if ( EMPTY_RUN_QUEUE() ) {
562 #if !defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_HOST_OS)
563         ASSERT(interrupted);
564 #endif
565         continue; // nothing to do
566     }
567
568 #if defined(PARALLEL_HASKELL)
569     scheduleSendPendingMessages();
570     if (EMPTY_RUN_QUEUE() && scheduleActivateSpark()) 
571         continue;
572
573 #if defined(SPARKS)
574     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
575 #endif
576
577     /* If we still have no work we need to send a FISH to get a spark
578        from another PE */
579     if (EMPTY_RUN_QUEUE()) {
580         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
581         ASSERT(rtsFalse); // should not happen at the moment
582     }
583     // from here: non-empty run queue.
584     //  TODO: merge above case with this, only one call processMessages() !
585     if (PacketsWaiting()) {  /* process incoming messages, if
586                                 any pending...  only in else
587                                 because getRemoteWork waits for
588                                 messages as well */
589         receivedFinish = processMessages();
590     }
591 #endif
592
593 #if defined(GRAN)
594     scheduleProcessEvent(event);
595 #endif
596
597     // 
598     // Get a thread to run
599     //
600     ASSERT(run_queue_hd != END_TSO_QUEUE);
601     POP_RUN_QUEUE(t);
602
603 #if defined(GRAN) || defined(PAR)
604     scheduleGranParReport(); // some kind of debuging output
605 #else
606     // Sanity check the thread we're about to run.  This can be
607     // expensive if there is lots of thread switching going on...
608     IF_DEBUG(sanity,checkTSO(t));
609 #endif
610
611 #if defined(RTS_SUPPORTS_THREADS)
612     // Check whether we can run this thread in the current task.
613     // If not, we have to pass our capability to the right task.
614     {
615       StgMainThread *m = t->main;
616       
617       if(m)
618       {
619         if(m == mainThread)
620         {
621           IF_DEBUG(scheduler,
622             sched_belch("### Running thread %d in bound thread", t->id));
623           // yes, the Haskell thread is bound to the current native thread
624         }
625         else
626         {
627           IF_DEBUG(scheduler,
628             sched_belch("### thread %d bound to another OS thread", t->id));
629           // no, bound to a different Haskell thread: pass to that thread
630           PUSH_ON_RUN_QUEUE(t);
631           continue;
632         }
633       }
634       else
635       {
636         if(mainThread != NULL)
637         // The thread we want to run is unbound.
638         {
639           IF_DEBUG(scheduler,
640             sched_belch("### this OS thread cannot run thread %d", t->id));
641           // no, the current native thread is bound to a different
642           // Haskell thread, so pass it to any worker thread
643           PUSH_ON_RUN_QUEUE(t);
644           continue; 
645         }
646       }
647     }
648 #endif
649
650     cap->r.rCurrentTSO = t;
651     
652     /* context switches are now initiated by the timer signal, unless
653      * the user specified "context switch as often as possible", with
654      * +RTS -C0
655      */
656     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
657          && (run_queue_hd != END_TSO_QUEUE
658              || blocked_queue_hd != END_TSO_QUEUE
659              || sleeping_queue != END_TSO_QUEUE)))
660         context_switch = 1;
661
662 run_thread:
663
664     RELEASE_LOCK(&sched_mutex);
665
666     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
667                               (long)t->id, whatNext_strs[t->what_next]));
668
669 #if defined(PROFILING)
670     startHeapProfTimer();
671 #endif
672
673     // ----------------------------------------------------------------------
674     // Run the current thread 
675
676     prev_what_next = t->what_next;
677
678     errno = t->saved_errno;
679     cap->r.rInHaskell = rtsTrue;
680
681     recent_activity = ACTIVITY_YES;
682
683     switch (prev_what_next) {
684
685     case ThreadKilled:
686     case ThreadComplete:
687         /* Thread already finished, return to scheduler. */
688         ret = ThreadFinished;
689         break;
690
691     case ThreadRunGHC:
692         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
693         break;
694
695     case ThreadInterpret:
696         ret = interpretBCO(cap);
697         break;
698
699     default:
700       barf("schedule: invalid what_next field");
701     }
702
703 #if defined(SMP)
704     // in SMP mode, we might return with a different capability than
705     // we started with, if the Haskell thread made a foreign call.  So
706     // let's find out what our current Capability is:
707     cap = myCapability();
708 #endif
709
710     // We have run some Haskell code: there might be blackhole-blocked
711     // threads to wake up now.
712     if ( blackhole_queue != END_TSO_QUEUE ) {
713         blackholes_need_checking = rtsTrue;
714     }
715
716     cap->r.rInHaskell = rtsFalse;
717
718     // The TSO might have moved, eg. if it re-entered the RTS and a GC
719     // happened.  So find the new location:
720     t = cap->r.rCurrentTSO;
721
722     // And save the current errno in this thread.
723     t->saved_errno = errno;
724
725     // ----------------------------------------------------------------------
726     
727     /* Costs for the scheduler are assigned to CCS_SYSTEM */
728 #if defined(PROFILING)
729     stopHeapProfTimer();
730     CCCS = CCS_SYSTEM;
731 #endif
732     
733     ACQUIRE_LOCK(&sched_mutex);
734     
735 #if defined(RTS_SUPPORTS_THREADS)
736     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
737 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
738     IF_DEBUG(scheduler,debugBelch("sched: "););
739 #endif
740     
741     schedulePostRunThread();
742
743     ready_to_gc = rtsFalse;
744
745     switch (ret) {
746     case HeapOverflow:
747         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
748         break;
749
750     case StackOverflow:
751         scheduleHandleStackOverflow(t);
752         break;
753
754     case ThreadYielding:
755         if (scheduleHandleYield(t, prev_what_next)) {
756             // shortcut for switching between compiler/interpreter:
757             goto run_thread; 
758         }
759         break;
760
761     case ThreadBlocked:
762         scheduleHandleThreadBlocked(t);
763         break;
764
765     case ThreadFinished:
766         if (scheduleHandleThreadFinished(mainThread, cap, t)) return;;
767         break;
768
769     default:
770       barf("schedule: invalid thread return code %d", (int)ret);
771     }
772
773     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
774     if (ready_to_gc) { scheduleDoGC(rtsFalse); }
775   } /* end of while() */
776
777   IF_PAR_DEBUG(verbose,
778                debugBelch("== Leaving schedule() after having received Finish\n"));
779 }
780
781 /* ----------------------------------------------------------------------------
782  * Setting up the scheduler loop
783  * ASSUMES: sched_mutex
784  * ------------------------------------------------------------------------- */
785
786 static void
787 schedulePreLoop(void)
788 {
789 #if defined(GRAN) 
790     /* set up first event to get things going */
791     /* ToDo: assign costs for system setup and init MainTSO ! */
792     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
793               ContinueThread, 
794               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
795     
796     IF_DEBUG(gran,
797              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
798                         CurrentTSO);
799              G_TSO(CurrentTSO, 5));
800     
801     if (RtsFlags.GranFlags.Light) {
802         /* Save current time; GranSim Light only */
803         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
804     }      
805 #endif
806 }
807
808 /* ----------------------------------------------------------------------------
809  * Start any pending signal handlers
810  * ASSUMES: sched_mutex
811  * ------------------------------------------------------------------------- */
812
813 static void
814 scheduleStartSignalHandlers(void)
815 {
816 #if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
817     if (signals_pending()) {
818       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
819       startSignalHandlers();
820       ACQUIRE_LOCK(&sched_mutex);
821     }
822 #endif
823 }
824
825 /* ----------------------------------------------------------------------------
826  * Check for blocked threads that can be woken up.
827  * ASSUMES: sched_mutex
828  * ------------------------------------------------------------------------- */
829
830 static void
831 scheduleCheckBlockedThreads(void)
832 {
833     //
834     // Check whether any waiting threads need to be woken up.  If the
835     // run queue is empty, and there are no other tasks running, we
836     // can wait indefinitely for something to happen.
837     //
838     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
839     {
840 #if defined(RTS_SUPPORTS_THREADS)
841         // We shouldn't be here...
842         barf("schedule: awaitEvent() in threaded RTS");
843 #else
844         awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
845 #endif
846     }
847 }
848
849
850 /* ----------------------------------------------------------------------------
851  * Check for threads blocked on BLACKHOLEs that can be woken up
852  * ASSUMES: sched_mutex
853  * ------------------------------------------------------------------------- */
854 static void
855 scheduleCheckBlackHoles( void )
856 {
857     if ( blackholes_need_checking )
858     {
859         checkBlackHoles();
860         blackholes_need_checking = rtsFalse;
861     }
862 }
863
864 /* ----------------------------------------------------------------------------
865  * Detect deadlock conditions and attempt to resolve them.
866  * ASSUMES: sched_mutex
867  * ------------------------------------------------------------------------- */
868
869 static void
870 scheduleDetectDeadlock()
871 {
872
873 #if defined(PARALLEL_HASKELL)
874     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
875     return;
876 #endif
877
878     /* 
879      * Detect deadlock: when we have no threads to run, there are no
880      * threads blocked, waiting for I/O, or sleeping, and all the
881      * other tasks are waiting for work, we must have a deadlock of
882      * some description.
883      */
884     if ( EMPTY_THREAD_QUEUES() )
885     {
886 #if defined(RTS_SUPPORTS_THREADS)
887         /* 
888          * In the threaded RTS, we only check for deadlock if there
889          * has been no activity in a complete timeslice.  This means
890          * we won't eagerly start a full GC just because we don't have
891          * any threads to run currently.
892          */
893         if (recent_activity != ACTIVITY_INACTIVE) return;
894 #endif
895
896         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
897
898         // Garbage collection can release some new threads due to
899         // either (a) finalizers or (b) threads resurrected because
900         // they are unreachable and will therefore be sent an
901         // exception.  Any threads thus released will be immediately
902         // runnable.
903
904         scheduleDoGC( rtsTrue/*force  major GC*/ );
905         recent_activity = ACTIVITY_DONE_GC;
906         if ( !EMPTY_RUN_QUEUE() ) return;
907
908 #if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
909         /* If we have user-installed signal handlers, then wait
910          * for signals to arrive rather then bombing out with a
911          * deadlock.
912          */
913         if ( anyUserHandlers() ) {
914             IF_DEBUG(scheduler, 
915                      sched_belch("still deadlocked, waiting for signals..."));
916
917             awaitUserSignals();
918
919             if (signals_pending()) {
920                 RELEASE_LOCK(&sched_mutex);
921                 startSignalHandlers();
922                 ACQUIRE_LOCK(&sched_mutex);
923             }
924
925             // either we have threads to run, or we were interrupted:
926             ASSERT(!EMPTY_RUN_QUEUE() || interrupted);
927         }
928 #endif
929
930 #if !defined(RTS_SUPPORTS_THREADS)
931         /* Probably a real deadlock.  Send the current main thread the
932          * Deadlock exception (or in the SMP build, send *all* main
933          * threads the deadlock exception, since none of them can make
934          * progress).
935          */
936         {
937             StgMainThread *m;
938             m = main_threads;
939             switch (m->tso->why_blocked) {
940             case BlockedOnSTM:
941             case BlockedOnBlackHole:
942             case BlockedOnException:
943             case BlockedOnMVar:
944                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
945                 return;
946             default:
947                 barf("deadlock: main thread blocked in a strange way");
948             }
949         }
950 #endif
951     }
952 }
953
954 /* ----------------------------------------------------------------------------
955  * Process an event (GRAN only)
956  * ------------------------------------------------------------------------- */
957
958 #if defined(GRAN)
959 static StgTSO *
960 scheduleProcessEvent(rtsEvent *event)
961 {
962     StgTSO *t;
963
964     if (RtsFlags.GranFlags.Light)
965       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
966
967     /* adjust time based on time-stamp */
968     if (event->time > CurrentTime[CurrentProc] &&
969         event->evttype != ContinueThread)
970       CurrentTime[CurrentProc] = event->time;
971     
972     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
973     if (!RtsFlags.GranFlags.Light)
974       handleIdlePEs();
975
976     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
977
978     /* main event dispatcher in GranSim */
979     switch (event->evttype) {
980       /* Should just be continuing execution */
981     case ContinueThread:
982       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
983       /* ToDo: check assertion
984       ASSERT(run_queue_hd != (StgTSO*)NULL &&
985              run_queue_hd != END_TSO_QUEUE);
986       */
987       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
988       if (!RtsFlags.GranFlags.DoAsyncFetch &&
989           procStatus[CurrentProc]==Fetching) {
990         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
991               CurrentTSO->id, CurrentTSO, CurrentProc);
992         goto next_thread;
993       } 
994       /* Ignore ContinueThreads for completed threads */
995       if (CurrentTSO->what_next == ThreadComplete) {
996         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
997               CurrentTSO->id, CurrentTSO, CurrentProc);
998         goto next_thread;
999       } 
1000       /* Ignore ContinueThreads for threads that are being migrated */
1001       if (PROCS(CurrentTSO)==Nowhere) { 
1002         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1003               CurrentTSO->id, CurrentTSO, CurrentProc);
1004         goto next_thread;
1005       }
1006       /* The thread should be at the beginning of the run queue */
1007       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1008         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1009               CurrentTSO->id, CurrentTSO, CurrentProc);
1010         break; // run the thread anyway
1011       }
1012       /*
1013       new_event(proc, proc, CurrentTime[proc],
1014                 FindWork,
1015                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1016       goto next_thread; 
1017       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1018       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1019
1020     case FetchNode:
1021       do_the_fetchnode(event);
1022       goto next_thread;             /* handle next event in event queue  */
1023       
1024     case GlobalBlock:
1025       do_the_globalblock(event);
1026       goto next_thread;             /* handle next event in event queue  */
1027       
1028     case FetchReply:
1029       do_the_fetchreply(event);
1030       goto next_thread;             /* handle next event in event queue  */
1031       
1032     case UnblockThread:   /* Move from the blocked queue to the tail of */
1033       do_the_unblock(event);
1034       goto next_thread;             /* handle next event in event queue  */
1035       
1036     case ResumeThread:  /* Move from the blocked queue to the tail of */
1037       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1038       event->tso->gran.blocktime += 
1039         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1040       do_the_startthread(event);
1041       goto next_thread;             /* handle next event in event queue  */
1042       
1043     case StartThread:
1044       do_the_startthread(event);
1045       goto next_thread;             /* handle next event in event queue  */
1046       
1047     case MoveThread:
1048       do_the_movethread(event);
1049       goto next_thread;             /* handle next event in event queue  */
1050       
1051     case MoveSpark:
1052       do_the_movespark(event);
1053       goto next_thread;             /* handle next event in event queue  */
1054       
1055     case FindWork:
1056       do_the_findwork(event);
1057       goto next_thread;             /* handle next event in event queue  */
1058       
1059     default:
1060       barf("Illegal event type %u\n", event->evttype);
1061     }  /* switch */
1062     
1063     /* This point was scheduler_loop in the old RTS */
1064
1065     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1066
1067     TimeOfLastEvent = CurrentTime[CurrentProc];
1068     TimeOfNextEvent = get_time_of_next_event();
1069     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1070     // CurrentTSO = ThreadQueueHd;
1071
1072     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1073                          TimeOfNextEvent));
1074
1075     if (RtsFlags.GranFlags.Light) 
1076       GranSimLight_leave_system(event, &ActiveTSO); 
1077
1078     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1079
1080     IF_DEBUG(gran, 
1081              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1082
1083     /* in a GranSim setup the TSO stays on the run queue */
1084     t = CurrentTSO;
1085     /* Take a thread from the run queue. */
1086     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1087
1088     IF_DEBUG(gran, 
1089              debugBelch("GRAN: About to run current thread, which is\n");
1090              G_TSO(t,5));
1091
1092     context_switch = 0; // turned on via GranYield, checking events and time slice
1093
1094     IF_DEBUG(gran, 
1095              DumpGranEvent(GR_SCHEDULE, t));
1096
1097     procStatus[CurrentProc] = Busy;
1098 }
1099 #endif // GRAN
1100
1101 /* ----------------------------------------------------------------------------
1102  * Send pending messages (PARALLEL_HASKELL only)
1103  * ------------------------------------------------------------------------- */
1104
1105 #if defined(PARALLEL_HASKELL)
1106 static StgTSO *
1107 scheduleSendPendingMessages(void)
1108 {
1109     StgSparkPool *pool;
1110     rtsSpark spark;
1111     StgTSO *t;
1112
1113 # if defined(PAR) // global Mem.Mgmt., omit for now
1114     if (PendingFetches != END_BF_QUEUE) {
1115         processFetches();
1116     }
1117 # endif
1118     
1119     if (RtsFlags.ParFlags.BufferTime) {
1120         // if we use message buffering, we must send away all message
1121         // packets which have become too old...
1122         sendOldBuffers(); 
1123     }
1124 }
1125 #endif
1126
1127 /* ----------------------------------------------------------------------------
1128  * Activate spark threads (PARALLEL_HASKELL only)
1129  * ------------------------------------------------------------------------- */
1130
1131 #if defined(PARALLEL_HASKELL)
1132 static void
1133 scheduleActivateSpark(void)
1134 {
1135 #if defined(SPARKS)
1136   ASSERT(EMPTY_RUN_QUEUE());
1137 /* We get here if the run queue is empty and want some work.
1138    We try to turn a spark into a thread, and add it to the run queue,
1139    from where it will be picked up in the next iteration of the scheduler
1140    loop.
1141 */
1142
1143       /* :-[  no local threads => look out for local sparks */
1144       /* the spark pool for the current PE */
1145       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1146       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1147           pool->hd < pool->tl) {
1148         /* 
1149          * ToDo: add GC code check that we really have enough heap afterwards!!
1150          * Old comment:
1151          * If we're here (no runnable threads) and we have pending
1152          * sparks, we must have a space problem.  Get enough space
1153          * to turn one of those pending sparks into a
1154          * thread... 
1155          */
1156
1157         spark = findSpark(rtsFalse);            /* get a spark */
1158         if (spark != (rtsSpark) NULL) {
1159           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1160           IF_PAR_DEBUG(fish, // schedule,
1161                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1162                              tso->id, tso, advisory_thread_count));
1163
1164           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1165             IF_PAR_DEBUG(fish, // schedule,
1166                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1167                             spark));
1168             return rtsFalse; /* failed to generate a thread */
1169           }                  /* otherwise fall through & pick-up new tso */
1170         } else {
1171           IF_PAR_DEBUG(fish, // schedule,
1172                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1173                              spark_queue_len(pool)));
1174           return rtsFalse;  /* failed to generate a thread */
1175         }
1176         return rtsTrue;  /* success in generating a thread */
1177   } else { /* no more threads permitted or pool empty */
1178     return rtsFalse;  /* failed to generateThread */
1179   }
1180 #else
1181   tso = NULL; // avoid compiler warning only
1182   return rtsFalse;  /* dummy in non-PAR setup */
1183 #endif // SPARKS
1184 }
1185 #endif // PARALLEL_HASKELL
1186
1187 /* ----------------------------------------------------------------------------
1188  * Get work from a remote node (PARALLEL_HASKELL only)
1189  * ------------------------------------------------------------------------- */
1190     
1191 #if defined(PARALLEL_HASKELL)
1192 static rtsBool
1193 scheduleGetRemoteWork(rtsBool *receivedFinish)
1194 {
1195   ASSERT(EMPTY_RUN_QUEUE());
1196
1197   if (RtsFlags.ParFlags.BufferTime) {
1198         IF_PAR_DEBUG(verbose, 
1199                 debugBelch("...send all pending data,"));
1200         {
1201           nat i;
1202           for (i=1; i<=nPEs; i++)
1203             sendImmediately(i); // send all messages away immediately
1204         }
1205   }
1206 # ifndef SPARKS
1207         //++EDEN++ idle() , i.e. send all buffers, wait for work
1208         // suppress fishing in EDEN... just look for incoming messages
1209         // (blocking receive)
1210   IF_PAR_DEBUG(verbose, 
1211                debugBelch("...wait for incoming messages...\n"));
1212   *receivedFinish = processMessages(); // blocking receive...
1213
1214         // and reenter scheduling loop after having received something
1215         // (return rtsFalse below)
1216
1217 # else /* activate SPARKS machinery */
1218 /* We get here, if we have no work, tried to activate a local spark, but still
1219    have no work. We try to get a remote spark, by sending a FISH message.
1220    Thread migration should be added here, and triggered when a sequence of 
1221    fishes returns without work. */
1222         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1223
1224       /* =8-[  no local sparks => look for work on other PEs */
1225         /*
1226          * We really have absolutely no work.  Send out a fish
1227          * (there may be some out there already), and wait for
1228          * something to arrive.  We clearly can't run any threads
1229          * until a SCHEDULE or RESUME arrives, and so that's what
1230          * we're hoping to see.  (Of course, we still have to
1231          * respond to other types of messages.)
1232          */
1233         rtsTime now = msTime() /*CURRENT_TIME*/;
1234         IF_PAR_DEBUG(verbose, 
1235                      debugBelch("--  now=%ld\n", now));
1236         IF_PAR_DEBUG(fish, // verbose,
1237              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1238                  (last_fish_arrived_at!=0 &&
1239                   last_fish_arrived_at+delay > now)) {
1240                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1241                      now, last_fish_arrived_at+delay, 
1242                      last_fish_arrived_at,
1243                      delay);
1244              });
1245   
1246         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1247             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1248           if (last_fish_arrived_at==0 ||
1249               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1250             /* outstandingFishes is set in sendFish, processFish;
1251                avoid flooding system with fishes via delay */
1252     next_fish_to_send_at = 0;  
1253   } else {
1254     /* ToDo: this should be done in the main scheduling loop to avoid the
1255              busy wait here; not so bad if fish delay is very small  */
1256     int iq = 0; // DEBUGGING -- HWL
1257     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1258     /* send a fish when ready, but process messages that arrive in the meantime */
1259     do {
1260       if (PacketsWaiting()) {
1261         iq++; // DEBUGGING
1262         *receivedFinish = processMessages();
1263       }
1264       now = msTime();
1265     } while (!*receivedFinish || now<next_fish_to_send_at);
1266     // JB: This means the fish could become obsolete, if we receive
1267     // work. Better check for work again? 
1268     // last line: while (!receivedFinish || !haveWork || now<...)
1269     // next line: if (receivedFinish || haveWork )
1270
1271     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1272       return rtsFalse;  // NB: this will leave scheduler loop
1273                         // immediately after return!
1274                           
1275     IF_PAR_DEBUG(fish, // verbose,
1276                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1277
1278   }
1279
1280     // JB: IMHO, this should all be hidden inside sendFish(...)
1281     /* pe = choosePE(); 
1282        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1283                 NEW_FISH_HUNGER);
1284
1285     // Global statistics: count no. of fishes
1286     if (RtsFlags.ParFlags.ParStats.Global &&
1287          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1288            globalParStats.tot_fish_mess++;
1289            }
1290     */ 
1291
1292   /* delayed fishes must have been sent by now! */
1293   next_fish_to_send_at = 0;  
1294   }
1295       
1296   *receivedFinish = processMessages();
1297 # endif /* SPARKS */
1298
1299  return rtsFalse;
1300  /* NB: this function always returns rtsFalse, meaning the scheduler
1301     loop continues with the next iteration; 
1302     rationale: 
1303       return code means success in finding work; we enter this function
1304       if there is no local work, thus have to send a fish which takes
1305       time until it arrives with work; in the meantime we should process
1306       messages in the main loop;
1307  */
1308 }
1309 #endif // PARALLEL_HASKELL
1310
1311 /* ----------------------------------------------------------------------------
1312  * PAR/GRAN: Report stats & debugging info(?)
1313  * ------------------------------------------------------------------------- */
1314
1315 #if defined(PAR) || defined(GRAN)
1316 static void
1317 scheduleGranParReport(void)
1318 {
1319   ASSERT(run_queue_hd != END_TSO_QUEUE);
1320
1321   /* Take a thread from the run queue, if we have work */
1322   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1323
1324     /* If this TSO has got its outport closed in the meantime, 
1325      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1326      * It has to be marked as TH_DEAD for this purpose.
1327      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1328
1329 JB: TODO: investigate wether state change field could be nuked
1330      entirely and replaced by the normal tso state (whatnext
1331      field). All we want to do is to kill tsos from outside.
1332      */
1333
1334     /* ToDo: write something to the log-file
1335     if (RTSflags.ParFlags.granSimStats && !sameThread)
1336         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1337
1338     CurrentTSO = t;
1339     */
1340     /* the spark pool for the current PE */
1341     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1342
1343     IF_DEBUG(scheduler, 
1344              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1345                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1346
1347     IF_PAR_DEBUG(fish,
1348              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1349                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1350
1351     if (RtsFlags.ParFlags.ParStats.Full && 
1352         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1353         (emitSchedule || // forced emit
1354          (t && LastTSO && t->id != LastTSO->id))) {
1355       /* 
1356          we are running a different TSO, so write a schedule event to log file
1357          NB: If we use fair scheduling we also have to write  a deschedule 
1358              event for LastTSO; with unfair scheduling we know that the
1359              previous tso has blocked whenever we switch to another tso, so
1360              we don't need it in GUM for now
1361       */
1362       IF_PAR_DEBUG(fish, // schedule,
1363                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1364
1365       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1366                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1367       emitSchedule = rtsFalse;
1368     }
1369 }     
1370 #endif
1371
1372 /* ----------------------------------------------------------------------------
1373  * After running a thread...
1374  * ASSUMES: sched_mutex
1375  * ------------------------------------------------------------------------- */
1376
1377 static void
1378 schedulePostRunThread(void)
1379 {
1380 #if defined(PAR)
1381     /* HACK 675: if the last thread didn't yield, make sure to print a 
1382        SCHEDULE event to the log file when StgRunning the next thread, even
1383        if it is the same one as before */
1384     LastTSO = t; 
1385     TimeOfLastYield = CURRENT_TIME;
1386 #endif
1387
1388   /* some statistics gathering in the parallel case */
1389
1390 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1391   switch (ret) {
1392     case HeapOverflow:
1393 # if defined(GRAN)
1394       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1395       globalGranStats.tot_heapover++;
1396 # elif defined(PAR)
1397       globalParStats.tot_heapover++;
1398 # endif
1399       break;
1400
1401      case StackOverflow:
1402 # if defined(GRAN)
1403       IF_DEBUG(gran, 
1404                DumpGranEvent(GR_DESCHEDULE, t));
1405       globalGranStats.tot_stackover++;
1406 # elif defined(PAR)
1407       // IF_DEBUG(par, 
1408       // DumpGranEvent(GR_DESCHEDULE, t);
1409       globalParStats.tot_stackover++;
1410 # endif
1411       break;
1412
1413     case ThreadYielding:
1414 # if defined(GRAN)
1415       IF_DEBUG(gran, 
1416                DumpGranEvent(GR_DESCHEDULE, t));
1417       globalGranStats.tot_yields++;
1418 # elif defined(PAR)
1419       // IF_DEBUG(par, 
1420       // DumpGranEvent(GR_DESCHEDULE, t);
1421       globalParStats.tot_yields++;
1422 # endif
1423       break; 
1424
1425     case ThreadBlocked:
1426 # if defined(GRAN)
1427       IF_DEBUG(scheduler,
1428                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1429                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1430                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1431                if (t->block_info.closure!=(StgClosure*)NULL)
1432                  print_bq(t->block_info.closure);
1433                debugBelch("\n"));
1434
1435       // ??? needed; should emit block before
1436       IF_DEBUG(gran, 
1437                DumpGranEvent(GR_DESCHEDULE, t)); 
1438       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1439       /*
1440         ngoq Dogh!
1441       ASSERT(procStatus[CurrentProc]==Busy || 
1442               ((procStatus[CurrentProc]==Fetching) && 
1443               (t->block_info.closure!=(StgClosure*)NULL)));
1444       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1445           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1446             procStatus[CurrentProc]==Fetching)) 
1447         procStatus[CurrentProc] = Idle;
1448       */
1449 # elif defined(PAR)
1450 //++PAR++  blockThread() writes the event (change?)
1451 # endif
1452     break;
1453
1454   case ThreadFinished:
1455     break;
1456
1457   default:
1458     barf("parGlobalStats: unknown return code");
1459     break;
1460     }
1461 #endif
1462 }
1463
1464 /* -----------------------------------------------------------------------------
1465  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1466  * ASSUMES: sched_mutex
1467  * -------------------------------------------------------------------------- */
1468
1469 static rtsBool
1470 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1471 {
1472     // did the task ask for a large block?
1473     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1474         // if so, get one and push it on the front of the nursery.
1475         bdescr *bd;
1476         lnat blocks;
1477         
1478         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1479         
1480         IF_DEBUG(scheduler,
1481                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1482                             (long)t->id, whatNext_strs[t->what_next], blocks));
1483         
1484         // don't do this if the nursery is (nearly) full, we'll GC first.
1485         if (cap->r.rCurrentNursery->link != NULL ||
1486             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1487                                                // if the nursery has only one block.
1488             
1489             bd = allocGroup( blocks );
1490             cap->r.rNursery->n_blocks += blocks;
1491             
1492             // link the new group into the list
1493             bd->link = cap->r.rCurrentNursery;
1494             bd->u.back = cap->r.rCurrentNursery->u.back;
1495             if (cap->r.rCurrentNursery->u.back != NULL) {
1496                 cap->r.rCurrentNursery->u.back->link = bd;
1497             } else {
1498 #if !defined(SMP)
1499                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1500                        g0s0 == cap->r.rNursery);
1501 #endif
1502                 cap->r.rNursery->blocks = bd;
1503             }             
1504             cap->r.rCurrentNursery->u.back = bd;
1505             
1506             // initialise it as a nursery block.  We initialise the
1507             // step, gen_no, and flags field of *every* sub-block in
1508             // this large block, because this is easier than making
1509             // sure that we always find the block head of a large
1510             // block whenever we call Bdescr() (eg. evacuate() and
1511             // isAlive() in the GC would both have to do this, at
1512             // least).
1513             { 
1514                 bdescr *x;
1515                 for (x = bd; x < bd + blocks; x++) {
1516                     x->step = cap->r.rNursery;
1517                     x->gen_no = 0;
1518                     x->flags = 0;
1519                 }
1520             }
1521             
1522             // This assert can be a killer if the app is doing lots
1523             // of large block allocations.
1524             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1525             
1526             // now update the nursery to point to the new block
1527             cap->r.rCurrentNursery = bd;
1528             
1529             // we might be unlucky and have another thread get on the
1530             // run queue before us and steal the large block, but in that
1531             // case the thread will just end up requesting another large
1532             // block.
1533             PUSH_ON_RUN_QUEUE(t);
1534             return rtsFalse;  /* not actually GC'ing */
1535         }
1536     }
1537     
1538     IF_DEBUG(scheduler,
1539              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1540                         (long)t->id, whatNext_strs[t->what_next]));
1541 #if defined(GRAN)
1542     ASSERT(!is_on_queue(t,CurrentProc));
1543 #elif defined(PARALLEL_HASKELL)
1544     /* Currently we emit a DESCHEDULE event before GC in GUM.
1545        ToDo: either add separate event to distinguish SYSTEM time from rest
1546        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1547     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1548         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1549                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1550         emitSchedule = rtsTrue;
1551     }
1552 #endif
1553       
1554     PUSH_ON_RUN_QUEUE(t);
1555     return rtsTrue;
1556     /* actual GC is done at the end of the while loop in schedule() */
1557 }
1558
1559 /* -----------------------------------------------------------------------------
1560  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1561  * ASSUMES: sched_mutex
1562  * -------------------------------------------------------------------------- */
1563
1564 static void
1565 scheduleHandleStackOverflow( StgTSO *t)
1566 {
1567     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1568                                   (long)t->id, whatNext_strs[t->what_next]));
1569     /* just adjust the stack for this thread, then pop it back
1570      * on the run queue.
1571      */
1572     { 
1573         /* enlarge the stack */
1574         StgTSO *new_t = threadStackOverflow(t);
1575         
1576         /* This TSO has moved, so update any pointers to it from the
1577          * main thread stack.  It better not be on any other queues...
1578          * (it shouldn't be).
1579          */
1580         if (t->main != NULL) {
1581             t->main->tso = new_t;
1582         }
1583         PUSH_ON_RUN_QUEUE(new_t);
1584     }
1585 }
1586
1587 /* -----------------------------------------------------------------------------
1588  * Handle a thread that returned to the scheduler with ThreadYielding
1589  * ASSUMES: sched_mutex
1590  * -------------------------------------------------------------------------- */
1591
1592 static rtsBool
1593 scheduleHandleYield( StgTSO *t, nat prev_what_next )
1594 {
1595     // Reset the context switch flag.  We don't do this just before
1596     // running the thread, because that would mean we would lose ticks
1597     // during GC, which can lead to unfair scheduling (a thread hogs
1598     // the CPU because the tick always arrives during GC).  This way
1599     // penalises threads that do a lot of allocation, but that seems
1600     // better than the alternative.
1601     context_switch = 0;
1602     
1603     /* put the thread back on the run queue.  Then, if we're ready to
1604      * GC, check whether this is the last task to stop.  If so, wake
1605      * up the GC thread.  getThread will block during a GC until the
1606      * GC is finished.
1607      */
1608     IF_DEBUG(scheduler,
1609              if (t->what_next != prev_what_next) {
1610                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1611                             (long)t->id, whatNext_strs[t->what_next]);
1612              } else {
1613                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1614                             (long)t->id, whatNext_strs[t->what_next]);
1615              }
1616         );
1617     
1618     IF_DEBUG(sanity,
1619              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1620              checkTSO(t));
1621     ASSERT(t->link == END_TSO_QUEUE);
1622     
1623     // Shortcut if we're just switching evaluators: don't bother
1624     // doing stack squeezing (which can be expensive), just run the
1625     // thread.
1626     if (t->what_next != prev_what_next) {
1627         return rtsTrue;
1628     }
1629     
1630 #if defined(GRAN)
1631     ASSERT(!is_on_queue(t,CurrentProc));
1632       
1633     IF_DEBUG(sanity,
1634              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1635              checkThreadQsSanity(rtsTrue));
1636
1637 #endif
1638
1639     addToRunQueue(t);
1640
1641 #if defined(GRAN)
1642     /* add a ContinueThread event to actually process the thread */
1643     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1644               ContinueThread,
1645               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1646     IF_GRAN_DEBUG(bq, 
1647                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1648                   G_EVENTQ(0);
1649                   G_CURR_THREADQ(0));
1650 #endif
1651     return rtsFalse;
1652 }
1653
1654 /* -----------------------------------------------------------------------------
1655  * Handle a thread that returned to the scheduler with ThreadBlocked
1656  * ASSUMES: sched_mutex
1657  * -------------------------------------------------------------------------- */
1658
1659 static void
1660 scheduleHandleThreadBlocked( StgTSO *t
1661 #if !defined(GRAN) && !defined(DEBUG)
1662     STG_UNUSED
1663 #endif
1664     )
1665 {
1666 #if defined(GRAN)
1667     IF_DEBUG(scheduler,
1668              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1669                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1670              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1671     
1672     // ??? needed; should emit block before
1673     IF_DEBUG(gran, 
1674              DumpGranEvent(GR_DESCHEDULE, t)); 
1675     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1676     /*
1677       ngoq Dogh!
1678       ASSERT(procStatus[CurrentProc]==Busy || 
1679       ((procStatus[CurrentProc]==Fetching) && 
1680       (t->block_info.closure!=(StgClosure*)NULL)));
1681       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1682       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1683       procStatus[CurrentProc]==Fetching)) 
1684       procStatus[CurrentProc] = Idle;
1685     */
1686 #elif defined(PAR)
1687     IF_DEBUG(scheduler,
1688              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1689                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1690     IF_PAR_DEBUG(bq,
1691                  
1692                  if (t->block_info.closure!=(StgClosure*)NULL) 
1693                  print_bq(t->block_info.closure));
1694     
1695     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1696     blockThread(t);
1697     
1698     /* whatever we schedule next, we must log that schedule */
1699     emitSchedule = rtsTrue;
1700     
1701 #else /* !GRAN */
1702
1703       // We don't need to do anything.  The thread is blocked, and it
1704       // has tidied up its stack and placed itself on whatever queue
1705       // it needs to be on.
1706
1707 #if !defined(SMP)
1708     ASSERT(t->why_blocked != NotBlocked);
1709              // This might not be true under SMP: we don't have
1710              // exclusive access to this TSO, so someone might have
1711              // woken it up by now.  This actually happens: try
1712              // conc023 +RTS -N2.
1713 #endif
1714
1715     IF_DEBUG(scheduler,
1716              debugBelch("--<< thread %d (%s) stopped: ", 
1717                         t->id, whatNext_strs[t->what_next]);
1718              printThreadBlockage(t);
1719              debugBelch("\n"));
1720     
1721     /* Only for dumping event to log file 
1722        ToDo: do I need this in GranSim, too?
1723        blockThread(t);
1724     */
1725 #endif
1726 }
1727
1728 /* -----------------------------------------------------------------------------
1729  * Handle a thread that returned to the scheduler with ThreadFinished
1730  * ASSUMES: sched_mutex
1731  * -------------------------------------------------------------------------- */
1732
1733 static rtsBool
1734 scheduleHandleThreadFinished( StgMainThread *mainThread
1735                               USED_WHEN_RTS_SUPPORTS_THREADS,
1736                               Capability *cap,
1737                               StgTSO *t )
1738 {
1739     /* Need to check whether this was a main thread, and if so,
1740      * return with the return value.
1741      *
1742      * We also end up here if the thread kills itself with an
1743      * uncaught exception, see Exception.cmm.
1744      */
1745     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1746                                   t->id, whatNext_strs[t->what_next]));
1747
1748 #if defined(GRAN)
1749       endThread(t, CurrentProc); // clean-up the thread
1750 #elif defined(PARALLEL_HASKELL)
1751       /* For now all are advisory -- HWL */
1752       //if(t->priority==AdvisoryPriority) ??
1753       advisory_thread_count--; // JB: Caution with this counter, buggy!
1754       
1755 # if defined(DIST)
1756       if(t->dist.priority==RevalPriority)
1757         FinishReval(t);
1758 # endif
1759     
1760 # if defined(EDENOLD)
1761       // the thread could still have an outport... (BUG)
1762       if (t->eden.outport != -1) {
1763       // delete the outport for the tso which has finished...
1764         IF_PAR_DEBUG(eden_ports,
1765                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1766                               t->eden.outport, t->id));
1767         deleteOPT(t);
1768       }
1769       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1770       if (t->eden.epid != -1) {
1771         IF_PAR_DEBUG(eden_ports,
1772                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1773                            t->id, t->eden.epid));
1774         removeTSOfromProcess(t);
1775       }
1776 # endif 
1777
1778 # if defined(PAR)
1779       if (RtsFlags.ParFlags.ParStats.Full &&
1780           !RtsFlags.ParFlags.ParStats.Suppressed) 
1781         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1782
1783       //  t->par only contains statistics: left out for now...
1784       IF_PAR_DEBUG(fish,
1785                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1786                               t->id,t,t->par.sparkname));
1787 # endif
1788 #endif // PARALLEL_HASKELL
1789
1790       //
1791       // Check whether the thread that just completed was a main
1792       // thread, and if so return with the result.  
1793       //
1794       // There is an assumption here that all thread completion goes
1795       // through this point; we need to make sure that if a thread
1796       // ends up in the ThreadKilled state, that it stays on the run
1797       // queue so it can be dealt with here.
1798       //
1799       if (
1800 #if defined(RTS_SUPPORTS_THREADS)
1801           mainThread != NULL
1802 #else
1803           mainThread->tso == t
1804 #endif
1805           )
1806       {
1807           // We are a bound thread: this must be our thread that just
1808           // completed.
1809           ASSERT(mainThread->tso == t);
1810
1811           if (t->what_next == ThreadComplete) {
1812               if (mainThread->ret) {
1813                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1814                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1815               }
1816               mainThread->stat = Success;
1817           } else {
1818               if (mainThread->ret) {
1819                   *(mainThread->ret) = NULL;
1820               }
1821               if (interrupted) {
1822                   mainThread->stat = Interrupted;
1823               } else {
1824                   mainThread->stat = Killed;
1825               }
1826           }
1827 #ifdef DEBUG
1828           removeThreadLabel((StgWord)mainThread->tso->id);
1829 #endif
1830           if (mainThread->prev == NULL) {
1831               ASSERT(mainThread == main_threads);
1832               main_threads = mainThread->link;
1833           } else {
1834               mainThread->prev->link = mainThread->link;
1835           }
1836           if (mainThread->link != NULL) {
1837               mainThread->link->prev = mainThread->prev;
1838           }
1839           releaseCapability(cap);
1840           return rtsTrue; // tells schedule() to return
1841       }
1842
1843 #ifdef RTS_SUPPORTS_THREADS
1844       ASSERT(t->main == NULL);
1845 #else
1846       if (t->main != NULL) {
1847           // Must be a main thread that is not the topmost one.  Leave
1848           // it on the run queue until the stack has unwound to the
1849           // point where we can deal with this.  Leaving it on the run
1850           // queue also ensures that the garbage collector knows about
1851           // this thread and its return value (it gets dropped from the
1852           // all_threads list so there's no other way to find it).
1853           APPEND_TO_RUN_QUEUE(t);
1854       }
1855 #endif
1856       return rtsFalse;
1857 }
1858
1859 /* -----------------------------------------------------------------------------
1860  * Perform a heap census, if PROFILING
1861  * -------------------------------------------------------------------------- */
1862
1863 static rtsBool
1864 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1865 {
1866 #if defined(PROFILING)
1867     // When we have +RTS -i0 and we're heap profiling, do a census at
1868     // every GC.  This lets us get repeatable runs for debugging.
1869     if (performHeapProfile ||
1870         (RtsFlags.ProfFlags.profileInterval==0 &&
1871          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1872         GarbageCollect(GetRoots, rtsTrue);
1873         heapCensus();
1874         performHeapProfile = rtsFalse;
1875         return rtsTrue;  // true <=> we already GC'd
1876     }
1877 #endif
1878     return rtsFalse;
1879 }
1880
1881 /* -----------------------------------------------------------------------------
1882  * Perform a garbage collection if necessary
1883  * ASSUMES: sched_mutex
1884  * -------------------------------------------------------------------------- */
1885
1886 static void
1887 scheduleDoGC( rtsBool force_major )
1888 {
1889     StgTSO *t;
1890 #ifdef SMP
1891     Capability *cap;
1892     static rtsBool waiting_for_gc;
1893     int n_capabilities = RtsFlags.ParFlags.nNodes - 1; 
1894            // subtract one because we're already holding one.
1895     Capability *caps[n_capabilities];
1896 #endif
1897
1898 #ifdef SMP
1899     // In order to GC, there must be no threads running Haskell code.
1900     // Therefore, the GC thread needs to hold *all* the capabilities,
1901     // and release them after the GC has completed.  
1902     //
1903     // This seems to be the simplest way: previous attempts involved
1904     // making all the threads with capabilities give up their
1905     // capabilities and sleep except for the *last* one, which
1906     // actually did the GC.  But it's quite hard to arrange for all
1907     // the other tasks to sleep and stay asleep.
1908     //
1909     // This does mean that there will be multiple entries in the 
1910     // thread->capability hash table for the current thread, but
1911     // they will be removed as normal when the capabilities are
1912     // released again.
1913     //
1914         
1915     // Someone else is already trying to GC
1916     if (waiting_for_gc) return;
1917     waiting_for_gc = rtsTrue;
1918
1919     while (n_capabilities > 0) {
1920         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
1921         waitForReturnCapability(&sched_mutex, &cap);
1922         n_capabilities--;
1923         caps[n_capabilities] = cap;
1924     }
1925
1926     waiting_for_gc = rtsFalse;
1927 #endif
1928
1929     /* Kick any transactions which are invalid back to their
1930      * atomically frames.  When next scheduled they will try to
1931      * commit, this commit will fail and they will retry.
1932      */
1933     for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1934         if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1935             if (!stmValidateNestOfTransactions (t -> trec)) {
1936                 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1937                 
1938                 // strip the stack back to the ATOMICALLY_FRAME, aborting
1939                 // the (nested) transaction, and saving the stack of any
1940                 // partially-evaluated thunks on the heap.
1941                 raiseAsync_(t, NULL, rtsTrue);
1942                 
1943 #ifdef REG_R1
1944                 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1945 #endif
1946             }
1947         }
1948     }
1949     
1950     // so this happens periodically:
1951     scheduleCheckBlackHoles();
1952     
1953     IF_DEBUG(scheduler, printAllThreads());
1954
1955     /* everybody back, start the GC.
1956      * Could do it in this thread, or signal a condition var
1957      * to do it in another thread.  Either way, we need to
1958      * broadcast on gc_pending_cond afterward.
1959      */
1960 #if defined(RTS_SUPPORTS_THREADS)
1961     IF_DEBUG(scheduler,sched_belch("doing GC"));
1962 #endif
1963     GarbageCollect(GetRoots, force_major);
1964     
1965 #if defined(SMP)
1966     {
1967         // release our stash of capabilities.
1968         nat i;
1969         for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
1970             releaseCapability(caps[i]);
1971         }
1972     }
1973 #endif
1974
1975 #if defined(GRAN)
1976     /* add a ContinueThread event to continue execution of current thread */
1977     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1978               ContinueThread,
1979               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1980     IF_GRAN_DEBUG(bq, 
1981                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1982                   G_EVENTQ(0);
1983                   G_CURR_THREADQ(0));
1984 #endif /* GRAN */
1985 }
1986
1987 /* ---------------------------------------------------------------------------
1988  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1989  * used by Control.Concurrent for error checking.
1990  * ------------------------------------------------------------------------- */
1991  
1992 StgBool
1993 rtsSupportsBoundThreads(void)
1994 {
1995 #if defined(RTS_SUPPORTS_THREADS)
1996   return rtsTrue;
1997 #else
1998   return rtsFalse;
1999 #endif
2000 }
2001
2002 /* ---------------------------------------------------------------------------
2003  * isThreadBound(tso): check whether tso is bound to an OS thread.
2004  * ------------------------------------------------------------------------- */
2005  
2006 StgBool
2007 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
2008 {
2009 #if defined(RTS_SUPPORTS_THREADS)
2010   return (tso->main != NULL);
2011 #endif
2012   return rtsFalse;
2013 }
2014
2015 /* ---------------------------------------------------------------------------
2016  * Singleton fork(). Do not copy any running threads.
2017  * ------------------------------------------------------------------------- */
2018
2019 #ifndef mingw32_HOST_OS
2020 #define FORKPROCESS_PRIMOP_SUPPORTED
2021 #endif
2022
2023 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2024 static void 
2025 deleteThreadImmediately(StgTSO *tso);
2026 #endif
2027 StgInt
2028 forkProcess(HsStablePtr *entry
2029 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2030             STG_UNUSED
2031 #endif
2032            )
2033 {
2034 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2035   pid_t pid;
2036   StgTSO* t,*next;
2037   StgMainThread *m;
2038   SchedulerStatus rc;
2039
2040   IF_DEBUG(scheduler,sched_belch("forking!"));
2041   rts_lock(); // This not only acquires sched_mutex, it also
2042               // makes sure that no other threads are running
2043
2044   pid = fork();
2045
2046   if (pid) { /* parent */
2047
2048   /* just return the pid */
2049     rts_unlock();
2050     return pid;
2051     
2052   } else { /* child */
2053     
2054     
2055       // delete all threads
2056     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
2057     
2058     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2059       next = t->link;
2060
2061         // don't allow threads to catch the ThreadKilled exception
2062       deleteThreadImmediately(t);
2063     }
2064     
2065       // wipe the main thread list
2066     while((m = main_threads) != NULL) {
2067       main_threads = m->link;
2068 # ifdef THREADED_RTS
2069       closeCondition(&m->bound_thread_cond);
2070 # endif
2071       stgFree(m);
2072     }
2073     
2074     rc = rts_evalStableIO(entry, NULL);  // run the action
2075     rts_checkSchedStatus("forkProcess",rc);
2076     
2077     rts_unlock();
2078     
2079     hs_exit();                      // clean up and exit
2080     stg_exit(0);
2081   }
2082 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2083   barf("forkProcess#: primop not supported, sorry!\n");
2084   return -1;
2085 #endif
2086 }
2087
2088 /* ---------------------------------------------------------------------------
2089  * deleteAllThreads():  kill all the live threads.
2090  *
2091  * This is used when we catch a user interrupt (^C), before performing
2092  * any necessary cleanups and running finalizers.
2093  *
2094  * Locks: sched_mutex held.
2095  * ------------------------------------------------------------------------- */
2096    
2097 void
2098 deleteAllThreads ( void )
2099 {
2100   StgTSO* t, *next;
2101   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2102   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2103       if (t->what_next == ThreadRelocated) {
2104           next = t->link;
2105       } else {
2106           next = t->global_link;
2107           deleteThread(t);
2108       }
2109   }      
2110
2111   // The run queue now contains a bunch of ThreadKilled threads.  We
2112   // must not throw these away: the main thread(s) will be in there
2113   // somewhere, and the main scheduler loop has to deal with it.
2114   // Also, the run queue is the only thing keeping these threads from
2115   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2116
2117   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2118   ASSERT(blackhole_queue == END_TSO_QUEUE);
2119   ASSERT(sleeping_queue == END_TSO_QUEUE);
2120 }
2121
2122 /* startThread and  insertThread are now in GranSim.c -- HWL */
2123
2124
2125 /* ---------------------------------------------------------------------------
2126  * Suspending & resuming Haskell threads.
2127  * 
2128  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2129  * its capability before calling the C function.  This allows another
2130  * task to pick up the capability and carry on running Haskell
2131  * threads.  It also means that if the C call blocks, it won't lock
2132  * the whole system.
2133  *
2134  * The Haskell thread making the C call is put to sleep for the
2135  * duration of the call, on the susepended_ccalling_threads queue.  We
2136  * give out a token to the task, which it can use to resume the thread
2137  * on return from the C function.
2138  * ------------------------------------------------------------------------- */
2139    
2140 StgInt
2141 suspendThread( StgRegTable *reg )
2142 {
2143   nat tok;
2144   Capability *cap;
2145   int saved_errno = errno;
2146
2147   /* assume that *reg is a pointer to the StgRegTable part
2148    * of a Capability.
2149    */
2150   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
2151
2152   ACQUIRE_LOCK(&sched_mutex);
2153
2154   IF_DEBUG(scheduler,
2155            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
2156
2157   // XXX this might not be necessary --SDM
2158   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
2159
2160   threadPaused(cap->r.rCurrentTSO);
2161   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
2162   suspended_ccalling_threads = cap->r.rCurrentTSO;
2163
2164   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
2165       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
2166       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
2167   } else {
2168       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
2169   }
2170
2171   /* Use the thread ID as the token; it should be unique */
2172   tok = cap->r.rCurrentTSO->id;
2173
2174   /* Hand back capability */
2175   cap->r.rInHaskell = rtsFalse;
2176   releaseCapability(cap);
2177   
2178 #if defined(RTS_SUPPORTS_THREADS)
2179   /* Preparing to leave the RTS, so ensure there's a native thread/task
2180      waiting to take over.
2181   */
2182   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
2183 #endif
2184
2185   RELEASE_LOCK(&sched_mutex);
2186   
2187   errno = saved_errno;
2188   return tok; 
2189 }
2190
2191 StgRegTable *
2192 resumeThread( StgInt tok )
2193 {
2194   StgTSO *tso, **prev;
2195   Capability *cap;
2196   int saved_errno = errno;
2197
2198 #if defined(RTS_SUPPORTS_THREADS)
2199   /* Wait for permission to re-enter the RTS with the result. */
2200   ACQUIRE_LOCK(&sched_mutex);
2201   waitForReturnCapability(&sched_mutex, &cap);
2202
2203   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
2204 #else
2205   grabCapability(&cap);
2206 #endif
2207
2208   /* Remove the thread off of the suspended list */
2209   prev = &suspended_ccalling_threads;
2210   for (tso = suspended_ccalling_threads; 
2211        tso != END_TSO_QUEUE; 
2212        prev = &tso->link, tso = tso->link) {
2213     if (tso->id == (StgThreadID)tok) {
2214       *prev = tso->link;
2215       break;
2216     }
2217   }
2218   if (tso == END_TSO_QUEUE) {
2219     barf("resumeThread: thread not found");
2220   }
2221   tso->link = END_TSO_QUEUE;
2222   
2223   if(tso->why_blocked == BlockedOnCCall) {
2224       awakenBlockedQueueNoLock(tso->blocked_exceptions);
2225       tso->blocked_exceptions = NULL;
2226   }
2227   
2228   /* Reset blocking status */
2229   tso->why_blocked  = NotBlocked;
2230
2231   cap->r.rCurrentTSO = tso;
2232   cap->r.rInHaskell = rtsTrue;
2233   RELEASE_LOCK(&sched_mutex);
2234   errno = saved_errno;
2235   return &cap->r;
2236 }
2237
2238 /* ---------------------------------------------------------------------------
2239  * Comparing Thread ids.
2240  *
2241  * This is used from STG land in the implementation of the
2242  * instances of Eq/Ord for ThreadIds.
2243  * ------------------------------------------------------------------------ */
2244
2245 int
2246 cmp_thread(StgPtr tso1, StgPtr tso2) 
2247
2248   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2249   StgThreadID id2 = ((StgTSO *)tso2)->id;
2250  
2251   if (id1 < id2) return (-1);
2252   if (id1 > id2) return 1;
2253   return 0;
2254 }
2255
2256 /* ---------------------------------------------------------------------------
2257  * Fetching the ThreadID from an StgTSO.
2258  *
2259  * This is used in the implementation of Show for ThreadIds.
2260  * ------------------------------------------------------------------------ */
2261 int
2262 rts_getThreadId(StgPtr tso) 
2263 {
2264   return ((StgTSO *)tso)->id;
2265 }
2266
2267 #ifdef DEBUG
2268 void
2269 labelThread(StgPtr tso, char *label)
2270 {
2271   int len;
2272   void *buf;
2273
2274   /* Caveat: Once set, you can only set the thread name to "" */
2275   len = strlen(label)+1;
2276   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2277   strncpy(buf,label,len);
2278   /* Update will free the old memory for us */
2279   updateThreadLabel(((StgTSO *)tso)->id,buf);
2280 }
2281 #endif /* DEBUG */
2282
2283 /* ---------------------------------------------------------------------------
2284    Create a new thread.
2285
2286    The new thread starts with the given stack size.  Before the
2287    scheduler can run, however, this thread needs to have a closure
2288    (and possibly some arguments) pushed on its stack.  See
2289    pushClosure() in Schedule.h.
2290
2291    createGenThread() and createIOThread() (in SchedAPI.h) are
2292    convenient packaged versions of this function.
2293
2294    currently pri (priority) is only used in a GRAN setup -- HWL
2295    ------------------------------------------------------------------------ */
2296 #if defined(GRAN)
2297 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2298 StgTSO *
2299 createThread(nat size, StgInt pri)
2300 #else
2301 StgTSO *
2302 createThread(nat size)
2303 #endif
2304 {
2305
2306     StgTSO *tso;
2307     nat stack_size;
2308
2309     /* First check whether we should create a thread at all */
2310 #if defined(PARALLEL_HASKELL)
2311   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2312   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2313     threadsIgnored++;
2314     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2315           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2316     return END_TSO_QUEUE;
2317   }
2318   threadsCreated++;
2319 #endif
2320
2321 #if defined(GRAN)
2322   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2323 #endif
2324
2325   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2326
2327   /* catch ridiculously small stack sizes */
2328   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2329     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2330   }
2331
2332   stack_size = size - TSO_STRUCT_SIZEW;
2333
2334   tso = (StgTSO *)allocate(size);
2335   TICK_ALLOC_TSO(stack_size, 0);
2336
2337   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2338 #if defined(GRAN)
2339   SET_GRAN_HDR(tso, ThisPE);
2340 #endif
2341
2342   // Always start with the compiled code evaluator
2343   tso->what_next = ThreadRunGHC;
2344
2345   tso->id = next_thread_id++; 
2346   tso->why_blocked  = NotBlocked;
2347   tso->blocked_exceptions = NULL;
2348
2349   tso->saved_errno = 0;
2350   tso->main = NULL;
2351   
2352   tso->stack_size   = stack_size;
2353   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2354                               - TSO_STRUCT_SIZEW;
2355   tso->sp           = (P_)&(tso->stack) + stack_size;
2356
2357   tso->trec = NO_TREC;
2358
2359 #ifdef PROFILING
2360   tso->prof.CCCS = CCS_MAIN;
2361 #endif
2362
2363   /* put a stop frame on the stack */
2364   tso->sp -= sizeofW(StgStopFrame);
2365   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2366   tso->link = END_TSO_QUEUE;
2367
2368   // ToDo: check this
2369 #if defined(GRAN)
2370   /* uses more flexible routine in GranSim */
2371   insertThread(tso, CurrentProc);
2372 #else
2373   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2374    * from its creation
2375    */
2376 #endif
2377
2378 #if defined(GRAN) 
2379   if (RtsFlags.GranFlags.GranSimStats.Full) 
2380     DumpGranEvent(GR_START,tso);
2381 #elif defined(PARALLEL_HASKELL)
2382   if (RtsFlags.ParFlags.ParStats.Full) 
2383     DumpGranEvent(GR_STARTQ,tso);
2384   /* HACk to avoid SCHEDULE 
2385      LastTSO = tso; */
2386 #endif
2387
2388   /* Link the new thread on the global thread list.
2389    */
2390   tso->global_link = all_threads;
2391   all_threads = tso;
2392
2393 #if defined(DIST)
2394   tso->dist.priority = MandatoryPriority; //by default that is...
2395 #endif
2396
2397 #if defined(GRAN)
2398   tso->gran.pri = pri;
2399 # if defined(DEBUG)
2400   tso->gran.magic = TSO_MAGIC; // debugging only
2401 # endif
2402   tso->gran.sparkname   = 0;
2403   tso->gran.startedat   = CURRENT_TIME; 
2404   tso->gran.exported    = 0;
2405   tso->gran.basicblocks = 0;
2406   tso->gran.allocs      = 0;
2407   tso->gran.exectime    = 0;
2408   tso->gran.fetchtime   = 0;
2409   tso->gran.fetchcount  = 0;
2410   tso->gran.blocktime   = 0;
2411   tso->gran.blockcount  = 0;
2412   tso->gran.blockedat   = 0;
2413   tso->gran.globalsparks = 0;
2414   tso->gran.localsparks  = 0;
2415   if (RtsFlags.GranFlags.Light)
2416     tso->gran.clock  = Now; /* local clock */
2417   else
2418     tso->gran.clock  = 0;
2419
2420   IF_DEBUG(gran,printTSO(tso));
2421 #elif defined(PARALLEL_HASKELL)
2422 # if defined(DEBUG)
2423   tso->par.magic = TSO_MAGIC; // debugging only
2424 # endif
2425   tso->par.sparkname   = 0;
2426   tso->par.startedat   = CURRENT_TIME; 
2427   tso->par.exported    = 0;
2428   tso->par.basicblocks = 0;
2429   tso->par.allocs      = 0;
2430   tso->par.exectime    = 0;
2431   tso->par.fetchtime   = 0;
2432   tso->par.fetchcount  = 0;
2433   tso->par.blocktime   = 0;
2434   tso->par.blockcount  = 0;
2435   tso->par.blockedat   = 0;
2436   tso->par.globalsparks = 0;
2437   tso->par.localsparks  = 0;
2438 #endif
2439
2440 #if defined(GRAN)
2441   globalGranStats.tot_threads_created++;
2442   globalGranStats.threads_created_on_PE[CurrentProc]++;
2443   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2444   globalGranStats.tot_sq_probes++;
2445 #elif defined(PARALLEL_HASKELL)
2446   // collect parallel global statistics (currently done together with GC stats)
2447   if (RtsFlags.ParFlags.ParStats.Global &&
2448       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2449     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2450     globalParStats.tot_threads_created++;
2451   }
2452 #endif 
2453
2454 #if defined(GRAN)
2455   IF_GRAN_DEBUG(pri,
2456                 sched_belch("==__ schedule: Created TSO %d (%p);",
2457                       CurrentProc, tso, tso->id));
2458 #elif defined(PARALLEL_HASKELL)
2459   IF_PAR_DEBUG(verbose,
2460                sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2461                            (long)tso->id, tso, advisory_thread_count));
2462 #else
2463   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2464                                  (long)tso->id, (long)tso->stack_size));
2465 #endif    
2466   return tso;
2467 }
2468
2469 #if defined(PAR)
2470 /* RFP:
2471    all parallel thread creation calls should fall through the following routine.
2472 */
2473 StgTSO *
2474 createThreadFromSpark(rtsSpark spark) 
2475 { StgTSO *tso;
2476   ASSERT(spark != (rtsSpark)NULL);
2477 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2478   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2479   { threadsIgnored++;
2480     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2481           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2482     return END_TSO_QUEUE;
2483   }
2484   else
2485   { threadsCreated++;
2486     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2487     if (tso==END_TSO_QUEUE)     
2488       barf("createSparkThread: Cannot create TSO");
2489 #if defined(DIST)
2490     tso->priority = AdvisoryPriority;
2491 #endif
2492     pushClosure(tso,spark);
2493     addToRunQueue(tso);
2494     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2495   }
2496   return tso;
2497 }
2498 #endif
2499
2500 /*
2501   Turn a spark into a thread.
2502   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2503 */
2504 #if 0
2505 StgTSO *
2506 activateSpark (rtsSpark spark) 
2507 {
2508   StgTSO *tso;
2509
2510   tso = createSparkThread(spark);
2511   if (RtsFlags.ParFlags.ParStats.Full) {   
2512     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2513       IF_PAR_DEBUG(verbose,
2514                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2515                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2516   }
2517   // ToDo: fwd info on local/global spark to thread -- HWL
2518   // tso->gran.exported =  spark->exported;
2519   // tso->gran.locked =   !spark->global;
2520   // tso->gran.sparkname = spark->name;
2521
2522   return tso;
2523 }
2524 #endif
2525
2526 /* ---------------------------------------------------------------------------
2527  * scheduleThread()
2528  *
2529  * scheduleThread puts a thread on the head of the runnable queue.
2530  * This will usually be done immediately after a thread is created.
2531  * The caller of scheduleThread must create the thread using e.g.
2532  * createThread and push an appropriate closure
2533  * on this thread's stack before the scheduler is invoked.
2534  * ------------------------------------------------------------------------ */
2535
2536 static void
2537 scheduleThread_(StgTSO *tso)
2538 {
2539   // The thread goes at the *end* of the run-queue, to avoid possible
2540   // starvation of any threads already on the queue.
2541   APPEND_TO_RUN_QUEUE(tso);
2542   threadRunnable();
2543 }
2544
2545 void
2546 scheduleThread(StgTSO* tso)
2547 {
2548   ACQUIRE_LOCK(&sched_mutex);
2549   scheduleThread_(tso);
2550   RELEASE_LOCK(&sched_mutex);
2551 }
2552
2553 #if defined(RTS_SUPPORTS_THREADS)
2554 static Condition bound_cond_cache;
2555 static int bound_cond_cache_full = 0;
2556 #endif
2557
2558
2559 SchedulerStatus
2560 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
2561                    Capability *initialCapability)
2562 {
2563     // Precondition: sched_mutex must be held
2564     StgMainThread *m;
2565
2566     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2567     m->tso = tso;
2568     tso->main = m;
2569     m->ret = ret;
2570     m->stat = NoStatus;
2571     m->link = main_threads;
2572     m->prev = NULL;
2573     if (main_threads != NULL) {
2574         main_threads->prev = m;
2575     }
2576     main_threads = m;
2577
2578 #if defined(RTS_SUPPORTS_THREADS)
2579     // Allocating a new condition for each thread is expensive, so we
2580     // cache one.  This is a pretty feeble hack, but it helps speed up
2581     // consecutive call-ins quite a bit.
2582     if (bound_cond_cache_full) {
2583         m->bound_thread_cond = bound_cond_cache;
2584         bound_cond_cache_full = 0;
2585     } else {
2586         initCondition(&m->bound_thread_cond);
2587     }
2588 #endif
2589
2590     /* Put the thread on the main-threads list prior to scheduling the TSO.
2591        Failure to do so introduces a race condition in the MT case (as
2592        identified by Wolfgang Thaller), whereby the new task/OS thread 
2593        created by scheduleThread_() would complete prior to the thread
2594        that spawned it managed to put 'itself' on the main-threads list.
2595        The upshot of it all being that the worker thread wouldn't get to
2596        signal the completion of the its work item for the main thread to
2597        see (==> it got stuck waiting.)    -- sof 6/02.
2598     */
2599     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2600     
2601     APPEND_TO_RUN_QUEUE(tso);
2602     // NB. Don't call threadRunnable() here, because the thread is
2603     // bound and only runnable by *this* OS thread, so waking up other
2604     // workers will just slow things down.
2605
2606     return waitThread_(m, initialCapability);
2607 }
2608
2609 /* ---------------------------------------------------------------------------
2610  * initScheduler()
2611  *
2612  * Initialise the scheduler.  This resets all the queues - if the
2613  * queues contained any threads, they'll be garbage collected at the
2614  * next pass.
2615  *
2616  * ------------------------------------------------------------------------ */
2617
2618 void 
2619 initScheduler(void)
2620 {
2621 #if defined(GRAN)
2622   nat i;
2623
2624   for (i=0; i<=MAX_PROC; i++) {
2625     run_queue_hds[i]      = END_TSO_QUEUE;
2626     run_queue_tls[i]      = END_TSO_QUEUE;
2627     blocked_queue_hds[i]  = END_TSO_QUEUE;
2628     blocked_queue_tls[i]  = END_TSO_QUEUE;
2629     ccalling_threadss[i]  = END_TSO_QUEUE;
2630     blackhole_queue[i]    = END_TSO_QUEUE;
2631     sleeping_queue        = END_TSO_QUEUE;
2632   }
2633 #else
2634   run_queue_hd      = END_TSO_QUEUE;
2635   run_queue_tl      = END_TSO_QUEUE;
2636   blocked_queue_hd  = END_TSO_QUEUE;
2637   blocked_queue_tl  = END_TSO_QUEUE;
2638   blackhole_queue   = END_TSO_QUEUE;
2639   sleeping_queue    = END_TSO_QUEUE;
2640 #endif 
2641
2642   suspended_ccalling_threads  = END_TSO_QUEUE;
2643
2644   main_threads = NULL;
2645   all_threads  = END_TSO_QUEUE;
2646
2647   context_switch = 0;
2648   interrupted    = 0;
2649
2650   RtsFlags.ConcFlags.ctxtSwitchTicks =
2651       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2652       
2653 #if defined(RTS_SUPPORTS_THREADS)
2654   /* Initialise the mutex and condition variables used by
2655    * the scheduler. */
2656   initMutex(&sched_mutex);
2657   initMutex(&term_mutex);
2658 #endif
2659   
2660   ACQUIRE_LOCK(&sched_mutex);
2661
2662   /* A capability holds the state a native thread needs in
2663    * order to execute STG code. At least one capability is
2664    * floating around (only SMP builds have more than one).
2665    */
2666   initCapabilities();
2667   
2668 #if defined(RTS_SUPPORTS_THREADS)
2669   initTaskManager();
2670 #endif
2671
2672 #if defined(SMP)
2673   /* eagerly start some extra workers */
2674   startingWorkerThread = RtsFlags.ParFlags.nNodes;
2675   startTasks(RtsFlags.ParFlags.nNodes, taskStart);
2676 #endif
2677
2678 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2679   initSparkPools();
2680 #endif
2681
2682   RELEASE_LOCK(&sched_mutex);
2683 }
2684
2685 void
2686 exitScheduler( void )
2687 {
2688     interrupted = rtsTrue;
2689     shutting_down_scheduler = rtsTrue;
2690 #if defined(RTS_SUPPORTS_THREADS)
2691     if (threadIsTask(osThreadId())) { taskStop(); }
2692     stopTaskManager();
2693 #endif
2694 }
2695
2696 /* ----------------------------------------------------------------------------
2697    Managing the per-task allocation areas.
2698    
2699    Each capability comes with an allocation area.  These are
2700    fixed-length block lists into which allocation can be done.
2701
2702    ToDo: no support for two-space collection at the moment???
2703    ------------------------------------------------------------------------- */
2704
2705 static SchedulerStatus
2706 waitThread_(StgMainThread* m, Capability *initialCapability)
2707 {
2708   SchedulerStatus stat;
2709
2710   // Precondition: sched_mutex must be held.
2711   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2712
2713 #if defined(GRAN)
2714   /* GranSim specific init */
2715   CurrentTSO = m->tso;                // the TSO to run
2716   procStatus[MainProc] = Busy;        // status of main PE
2717   CurrentProc = MainProc;             // PE to run it on
2718   schedule(m,initialCapability);
2719 #else
2720   schedule(m,initialCapability);
2721   ASSERT(m->stat != NoStatus);
2722 #endif
2723
2724   stat = m->stat;
2725
2726 #if defined(RTS_SUPPORTS_THREADS)
2727   // Free the condition variable, returning it to the cache if possible.
2728   if (!bound_cond_cache_full) {
2729       bound_cond_cache = m->bound_thread_cond;
2730       bound_cond_cache_full = 1;
2731   } else {
2732       closeCondition(&m->bound_thread_cond);
2733   }
2734 #endif
2735
2736   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2737   stgFree(m);
2738
2739   // Postcondition: sched_mutex still held
2740   return stat;
2741 }
2742
2743 /* ---------------------------------------------------------------------------
2744    Where are the roots that we know about?
2745
2746         - all the threads on the runnable queue
2747         - all the threads on the blocked queue
2748         - all the threads on the sleeping queue
2749         - all the thread currently executing a _ccall_GC
2750         - all the "main threads"
2751      
2752    ------------------------------------------------------------------------ */
2753
2754 /* This has to be protected either by the scheduler monitor, or by the
2755         garbage collection monitor (probably the latter).
2756         KH @ 25/10/99
2757 */
2758
2759 void
2760 GetRoots( evac_fn evac )
2761 {
2762 #if defined(GRAN)
2763   {
2764     nat i;
2765     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2766       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2767           evac((StgClosure **)&run_queue_hds[i]);
2768       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2769           evac((StgClosure **)&run_queue_tls[i]);
2770       
2771       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2772           evac((StgClosure **)&blocked_queue_hds[i]);
2773       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2774           evac((StgClosure **)&blocked_queue_tls[i]);
2775       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2776           evac((StgClosure **)&ccalling_threads[i]);
2777     }
2778   }
2779
2780   markEventQueue();
2781
2782 #else /* !GRAN */
2783   if (run_queue_hd != END_TSO_QUEUE) {
2784       ASSERT(run_queue_tl != END_TSO_QUEUE);
2785       evac((StgClosure **)&run_queue_hd);
2786       evac((StgClosure **)&run_queue_tl);
2787   }
2788   
2789   if (blocked_queue_hd != END_TSO_QUEUE) {
2790       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2791       evac((StgClosure **)&blocked_queue_hd);
2792       evac((StgClosure **)&blocked_queue_tl);
2793   }
2794   
2795   if (sleeping_queue != END_TSO_QUEUE) {
2796       evac((StgClosure **)&sleeping_queue);
2797   }
2798 #endif 
2799
2800   if (blackhole_queue != END_TSO_QUEUE) {
2801       evac((StgClosure **)&blackhole_queue);
2802   }
2803
2804   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2805       evac((StgClosure **)&suspended_ccalling_threads);
2806   }
2807
2808 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2809   markSparkQueue(evac);
2810 #endif
2811
2812 #if defined(RTS_USER_SIGNALS)
2813   // mark the signal handlers (signals should be already blocked)
2814   markSignalHandlers(evac);
2815 #endif
2816 }
2817
2818 /* -----------------------------------------------------------------------------
2819    performGC
2820
2821    This is the interface to the garbage collector from Haskell land.
2822    We provide this so that external C code can allocate and garbage
2823    collect when called from Haskell via _ccall_GC.
2824
2825    It might be useful to provide an interface whereby the programmer
2826    can specify more roots (ToDo).
2827    
2828    This needs to be protected by the GC condition variable above.  KH.
2829    -------------------------------------------------------------------------- */
2830
2831 static void (*extra_roots)(evac_fn);
2832
2833 void
2834 performGC(void)
2835 {
2836   /* Obligated to hold this lock upon entry */
2837   ACQUIRE_LOCK(&sched_mutex);
2838   GarbageCollect(GetRoots,rtsFalse);
2839   RELEASE_LOCK(&sched_mutex);
2840 }
2841
2842 void
2843 performMajorGC(void)
2844 {
2845   ACQUIRE_LOCK(&sched_mutex);
2846   GarbageCollect(GetRoots,rtsTrue);
2847   RELEASE_LOCK(&sched_mutex);
2848 }
2849
2850 static void
2851 AllRoots(evac_fn evac)
2852 {
2853     GetRoots(evac);             // the scheduler's roots
2854     extra_roots(evac);          // the user's roots
2855 }
2856
2857 void
2858 performGCWithRoots(void (*get_roots)(evac_fn))
2859 {
2860   ACQUIRE_LOCK(&sched_mutex);
2861   extra_roots = get_roots;
2862   GarbageCollect(AllRoots,rtsFalse);
2863   RELEASE_LOCK(&sched_mutex);
2864 }
2865
2866 /* -----------------------------------------------------------------------------
2867    Stack overflow
2868
2869    If the thread has reached its maximum stack size, then raise the
2870    StackOverflow exception in the offending thread.  Otherwise
2871    relocate the TSO into a larger chunk of memory and adjust its stack
2872    size appropriately.
2873    -------------------------------------------------------------------------- */
2874
2875 static StgTSO *
2876 threadStackOverflow(StgTSO *tso)
2877 {
2878   nat new_stack_size, stack_words;
2879   lnat new_tso_size;
2880   StgPtr new_sp;
2881   StgTSO *dest;
2882
2883   IF_DEBUG(sanity,checkTSO(tso));
2884   if (tso->stack_size >= tso->max_stack_size) {
2885
2886     IF_DEBUG(gc,
2887              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2888                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2889              /* If we're debugging, just print out the top of the stack */
2890              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2891                                               tso->sp+64)));
2892
2893     /* Send this thread the StackOverflow exception */
2894     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2895     return tso;
2896   }
2897
2898   /* Try to double the current stack size.  If that takes us over the
2899    * maximum stack size for this thread, then use the maximum instead.
2900    * Finally round up so the TSO ends up as a whole number of blocks.
2901    */
2902   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2903   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2904                                        TSO_STRUCT_SIZE)/sizeof(W_);
2905   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2906   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2907
2908   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2909
2910   dest = (StgTSO *)allocate(new_tso_size);
2911   TICK_ALLOC_TSO(new_stack_size,0);
2912
2913   /* copy the TSO block and the old stack into the new area */
2914   memcpy(dest,tso,TSO_STRUCT_SIZE);
2915   stack_words = tso->stack + tso->stack_size - tso->sp;
2916   new_sp = (P_)dest + new_tso_size - stack_words;
2917   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2918
2919   /* relocate the stack pointers... */
2920   dest->sp         = new_sp;
2921   dest->stack_size = new_stack_size;
2922         
2923   /* Mark the old TSO as relocated.  We have to check for relocated
2924    * TSOs in the garbage collector and any primops that deal with TSOs.
2925    *
2926    * It's important to set the sp value to just beyond the end
2927    * of the stack, so we don't attempt to scavenge any part of the
2928    * dead TSO's stack.
2929    */
2930   tso->what_next = ThreadRelocated;
2931   tso->link = dest;
2932   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2933   tso->why_blocked = NotBlocked;
2934
2935   IF_PAR_DEBUG(verbose,
2936                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2937                      tso->id, tso, tso->stack_size);
2938                /* If we're debugging, just print out the top of the stack */
2939                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2940                                                 tso->sp+64)));
2941   
2942   IF_DEBUG(sanity,checkTSO(tso));
2943 #if 0
2944   IF_DEBUG(scheduler,printTSO(dest));
2945 #endif
2946
2947   return dest;
2948 }
2949
2950 /* ---------------------------------------------------------------------------
2951    Wake up a queue that was blocked on some resource.
2952    ------------------------------------------------------------------------ */
2953
2954 #if defined(GRAN)
2955 STATIC_INLINE void
2956 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2957 {
2958 }
2959 #elif defined(PARALLEL_HASKELL)
2960 STATIC_INLINE void
2961 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2962 {
2963   /* write RESUME events to log file and
2964      update blocked and fetch time (depending on type of the orig closure) */
2965   if (RtsFlags.ParFlags.ParStats.Full) {
2966     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2967                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2968                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2969     if (EMPTY_RUN_QUEUE())
2970       emitSchedule = rtsTrue;
2971
2972     switch (get_itbl(node)->type) {
2973         case FETCH_ME_BQ:
2974           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2975           break;
2976         case RBH:
2977         case FETCH_ME:
2978         case BLACKHOLE_BQ:
2979           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2980           break;
2981 #ifdef DIST
2982         case MVAR:
2983           break;
2984 #endif    
2985         default:
2986           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2987         }
2988       }
2989 }
2990 #endif
2991
2992 #if defined(GRAN)
2993 StgBlockingQueueElement *
2994 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2995 {
2996     StgTSO *tso;
2997     PEs node_loc, tso_loc;
2998
2999     node_loc = where_is(node); // should be lifted out of loop
3000     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3001     tso_loc = where_is((StgClosure *)tso);
3002     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3003       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3004       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3005       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3006       // insertThread(tso, node_loc);
3007       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3008                 ResumeThread,
3009                 tso, node, (rtsSpark*)NULL);
3010       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3011       // len_local++;
3012       // len++;
3013     } else { // TSO is remote (actually should be FMBQ)
3014       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3015                                   RtsFlags.GranFlags.Costs.gunblocktime +
3016                                   RtsFlags.GranFlags.Costs.latency;
3017       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3018                 UnblockThread,
3019                 tso, node, (rtsSpark*)NULL);
3020       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3021       // len++;
3022     }
3023     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3024     IF_GRAN_DEBUG(bq,
3025                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3026                           (node_loc==tso_loc ? "Local" : "Global"), 
3027                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3028     tso->block_info.closure = NULL;
3029     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3030                              tso->id, tso));
3031 }
3032 #elif defined(PARALLEL_HASKELL)
3033 StgBlockingQueueElement *
3034 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
3035 {
3036     StgBlockingQueueElement *next;
3037
3038     switch (get_itbl(bqe)->type) {
3039     case TSO:
3040       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3041       /* if it's a TSO just push it onto the run_queue */
3042       next = bqe->link;
3043       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3044       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3045       threadRunnable();
3046       unblockCount(bqe, node);
3047       /* reset blocking status after dumping event */
3048       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3049       break;
3050
3051     case BLOCKED_FETCH:
3052       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3053       next = bqe->link;
3054       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3055       PendingFetches = (StgBlockedFetch *)bqe;
3056       break;
3057
3058 # if defined(DEBUG)
3059       /* can ignore this case in a non-debugging setup; 
3060          see comments on RBHSave closures above */
3061     case CONSTR:
3062       /* check that the closure is an RBHSave closure */
3063       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3064              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3065              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3066       break;
3067
3068     default:
3069       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3070            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3071            (StgClosure *)bqe);
3072 # endif
3073     }
3074   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3075   return next;
3076 }
3077
3078 #else /* !GRAN && !PARALLEL_HASKELL */
3079 StgTSO *
3080 unblockOneLocked(StgTSO *tso)
3081 {
3082   StgTSO *next;
3083
3084   ASSERT(get_itbl(tso)->type == TSO);
3085   ASSERT(tso->why_blocked != NotBlocked);
3086   tso->why_blocked = NotBlocked;
3087   next = tso->link;
3088   tso->link = END_TSO_QUEUE;
3089   APPEND_TO_RUN_QUEUE(tso);
3090   threadRunnable();
3091   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3092   return next;
3093 }
3094 #endif
3095
3096 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3097 INLINE_ME StgBlockingQueueElement *
3098 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3099 {
3100   ACQUIRE_LOCK(&sched_mutex);
3101   bqe = unblockOneLocked(bqe, node);
3102   RELEASE_LOCK(&sched_mutex);
3103   return bqe;
3104 }
3105 #else
3106 INLINE_ME StgTSO *
3107 unblockOne(StgTSO *tso)
3108 {
3109   ACQUIRE_LOCK(&sched_mutex);
3110   tso = unblockOneLocked(tso);
3111   RELEASE_LOCK(&sched_mutex);
3112   return tso;
3113 }
3114 #endif
3115
3116 #if defined(GRAN)
3117 void 
3118 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3119 {
3120   StgBlockingQueueElement *bqe;
3121   PEs node_loc;
3122   nat len = 0; 
3123
3124   IF_GRAN_DEBUG(bq, 
3125                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3126                       node, CurrentProc, CurrentTime[CurrentProc], 
3127                       CurrentTSO->id, CurrentTSO));
3128
3129   node_loc = where_is(node);
3130
3131   ASSERT(q == END_BQ_QUEUE ||
3132          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3133          get_itbl(q)->type == CONSTR); // closure (type constructor)
3134   ASSERT(is_unique(node));
3135
3136   /* FAKE FETCH: magically copy the node to the tso's proc;
3137      no Fetch necessary because in reality the node should not have been 
3138      moved to the other PE in the first place
3139   */
3140   if (CurrentProc!=node_loc) {
3141     IF_GRAN_DEBUG(bq, 
3142                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3143                         node, node_loc, CurrentProc, CurrentTSO->id, 
3144                         // CurrentTSO, where_is(CurrentTSO),
3145                         node->header.gran.procs));
3146     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3147     IF_GRAN_DEBUG(bq, 
3148                   debugBelch("## new bitmask of node %p is %#x\n",
3149                         node, node->header.gran.procs));
3150     if (RtsFlags.GranFlags.GranSimStats.Global) {
3151       globalGranStats.tot_fake_fetches++;
3152     }
3153   }
3154
3155   bqe = q;
3156   // ToDo: check: ASSERT(CurrentProc==node_loc);
3157   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3158     //next = bqe->link;
3159     /* 
3160        bqe points to the current element in the queue
3161        next points to the next element in the queue
3162     */
3163     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3164     //tso_loc = where_is(tso);
3165     len++;
3166     bqe = unblockOneLocked(bqe, node);
3167   }
3168
3169   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3170      the closure to make room for the anchor of the BQ */
3171   if (bqe!=END_BQ_QUEUE) {
3172     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3173     /*
3174     ASSERT((info_ptr==&RBH_Save_0_info) ||
3175            (info_ptr==&RBH_Save_1_info) ||
3176            (info_ptr==&RBH_Save_2_info));
3177     */
3178     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3179     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3180     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3181
3182     IF_GRAN_DEBUG(bq,
3183                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3184                         node, info_type(node)));
3185   }
3186
3187   /* statistics gathering */
3188   if (RtsFlags.GranFlags.GranSimStats.Global) {
3189     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3190     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3191     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3192     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3193   }
3194   IF_GRAN_DEBUG(bq,
3195                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3196                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3197 }
3198 #elif defined(PARALLEL_HASKELL)
3199 void 
3200 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3201 {
3202   StgBlockingQueueElement *bqe;
3203
3204   ACQUIRE_LOCK(&sched_mutex);
3205
3206   IF_PAR_DEBUG(verbose, 
3207                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3208                      node, mytid));
3209 #ifdef DIST  
3210   //RFP
3211   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3212     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3213     return;
3214   }
3215 #endif
3216   
3217   ASSERT(q == END_BQ_QUEUE ||
3218          get_itbl(q)->type == TSO ||           
3219          get_itbl(q)->type == BLOCKED_FETCH || 
3220          get_itbl(q)->type == CONSTR); 
3221
3222   bqe = q;
3223   while (get_itbl(bqe)->type==TSO || 
3224          get_itbl(bqe)->type==BLOCKED_FETCH) {
3225     bqe = unblockOneLocked(bqe, node);
3226   }
3227   RELEASE_LOCK(&sched_mutex);
3228 }
3229
3230 #else   /* !GRAN && !PARALLEL_HASKELL */
3231
3232 void
3233 awakenBlockedQueueNoLock(StgTSO *tso)
3234 {
3235   while (tso != END_TSO_QUEUE) {
3236     tso = unblockOneLocked(tso);
3237   }
3238 }
3239
3240 void
3241 awakenBlockedQueue(StgTSO *tso)
3242 {
3243   ACQUIRE_LOCK(&sched_mutex);
3244   while (tso != END_TSO_QUEUE) {
3245     tso = unblockOneLocked(tso);
3246   }
3247   RELEASE_LOCK(&sched_mutex);
3248 }
3249 #endif
3250
3251 /* ---------------------------------------------------------------------------
3252    Interrupt execution
3253    - usually called inside a signal handler so it mustn't do anything fancy.   
3254    ------------------------------------------------------------------------ */
3255
3256 void
3257 interruptStgRts(void)
3258 {
3259     interrupted    = 1;
3260     context_switch = 1;
3261     threadRunnable();
3262     /* ToDo: if invoked from a signal handler, this threadRunnable
3263      * only works if there's another thread (not this one) waiting to
3264      * be woken up.
3265      */
3266 }
3267
3268 /* -----------------------------------------------------------------------------
3269    Unblock a thread
3270
3271    This is for use when we raise an exception in another thread, which
3272    may be blocked.
3273    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3274    -------------------------------------------------------------------------- */
3275
3276 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3277 /*
3278   NB: only the type of the blocking queue is different in GranSim and GUM
3279       the operations on the queue-elements are the same
3280       long live polymorphism!
3281
3282   Locks: sched_mutex is held upon entry and exit.
3283
3284 */
3285 static void
3286 unblockThread(StgTSO *tso)
3287 {
3288   StgBlockingQueueElement *t, **last;
3289
3290   switch (tso->why_blocked) {
3291
3292   case NotBlocked:
3293     return;  /* not blocked */
3294
3295   case BlockedOnSTM:
3296     // Be careful: nothing to do here!  We tell the scheduler that the thread
3297     // is runnable and we leave it to the stack-walking code to abort the 
3298     // transaction while unwinding the stack.  We should perhaps have a debugging
3299     // test to make sure that this really happens and that the 'zombie' transaction
3300     // does not get committed.
3301     goto done;
3302
3303   case BlockedOnMVar:
3304     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3305     {
3306       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3307       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3308
3309       last = (StgBlockingQueueElement **)&mvar->head;
3310       for (t = (StgBlockingQueueElement *)mvar->head; 
3311            t != END_BQ_QUEUE; 
3312            last = &t->link, last_tso = t, t = t->link) {
3313         if (t == (StgBlockingQueueElement *)tso) {
3314           *last = (StgBlockingQueueElement *)tso->link;
3315           if (mvar->tail == tso) {
3316             mvar->tail = (StgTSO *)last_tso;
3317           }
3318           goto done;
3319         }
3320       }
3321       barf("unblockThread (MVAR): TSO not found");
3322     }
3323
3324   case BlockedOnBlackHole:
3325     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3326     {
3327       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3328
3329       last = &bq->blocking_queue;
3330       for (t = bq->blocking_queue; 
3331            t != END_BQ_QUEUE; 
3332            last = &t->link, t = t->link) {
3333         if (t == (StgBlockingQueueElement *)tso) {
3334           *last = (StgBlockingQueueElement *)tso->link;
3335           goto done;
3336         }
3337       }
3338       barf("unblockThread (BLACKHOLE): TSO not found");
3339     }
3340
3341   case BlockedOnException:
3342     {
3343       StgTSO *target  = tso->block_info.tso;
3344
3345       ASSERT(get_itbl(target)->type == TSO);
3346
3347       if (target->what_next == ThreadRelocated) {
3348           target = target->link;
3349           ASSERT(get_itbl(target)->type == TSO);
3350       }
3351
3352       ASSERT(target->blocked_exceptions != NULL);
3353
3354       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3355       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3356            t != END_BQ_QUEUE; 
3357            last = &t->link, t = t->link) {
3358         ASSERT(get_itbl(t)->type == TSO);
3359         if (t == (StgBlockingQueueElement *)tso) {
3360           *last = (StgBlockingQueueElement *)tso->link;
3361           goto done;
3362         }
3363       }
3364       barf("unblockThread (Exception): TSO not found");
3365     }
3366
3367   case BlockedOnRead:
3368   case BlockedOnWrite:
3369 #if defined(mingw32_HOST_OS)
3370   case BlockedOnDoProc:
3371 #endif
3372     {
3373       /* take TSO off blocked_queue */
3374       StgBlockingQueueElement *prev = NULL;
3375       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3376            prev = t, t = t->link) {
3377         if (t == (StgBlockingQueueElement *)tso) {
3378           if (prev == NULL) {
3379             blocked_queue_hd = (StgTSO *)t->link;
3380             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3381               blocked_queue_tl = END_TSO_QUEUE;
3382             }
3383           } else {
3384             prev->link = t->link;
3385             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3386               blocked_queue_tl = (StgTSO *)prev;
3387             }
3388           }
3389 #if defined(mingw32_HOST_OS)
3390           /* (Cooperatively) signal that the worker thread should abort
3391            * the request.
3392            */
3393           abandonWorkRequest(tso->block_info.async_result->reqID);
3394 #endif
3395           goto done;
3396         }
3397       }
3398       barf("unblockThread (I/O): TSO not found");
3399     }
3400
3401   case BlockedOnDelay:
3402     {
3403       /* take TSO off sleeping_queue */
3404       StgBlockingQueueElement *prev = NULL;
3405       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3406            prev = t, t = t->link) {
3407         if (t == (StgBlockingQueueElement *)tso) {
3408           if (prev == NULL) {
3409             sleeping_queue = (StgTSO *)t->link;
3410           } else {
3411             prev->link = t->link;
3412           }
3413           goto done;
3414         }
3415       }
3416       barf("unblockThread (delay): TSO not found");
3417     }
3418
3419   default:
3420     barf("unblockThread");
3421   }
3422
3423  done:
3424   tso->link = END_TSO_QUEUE;
3425   tso->why_blocked = NotBlocked;
3426   tso->block_info.closure = NULL;
3427   PUSH_ON_RUN_QUEUE(tso);
3428 }
3429 #else
3430 static void
3431 unblockThread(StgTSO *tso)
3432 {
3433   StgTSO *t, **last;
3434   
3435   /* To avoid locking unnecessarily. */
3436   if (tso->why_blocked == NotBlocked) {
3437     return;
3438   }
3439
3440   switch (tso->why_blocked) {
3441
3442   case BlockedOnSTM:
3443     // Be careful: nothing to do here!  We tell the scheduler that the thread
3444     // is runnable and we leave it to the stack-walking code to abort the 
3445     // transaction while unwinding the stack.  We should perhaps have a debugging
3446     // test to make sure that this really happens and that the 'zombie' transaction
3447     // does not get committed.
3448     goto done;
3449
3450   case BlockedOnMVar:
3451     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3452     {
3453       StgTSO *last_tso = END_TSO_QUEUE;
3454       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3455
3456       last = &mvar->head;
3457       for (t = mvar->head; t != END_TSO_QUEUE; 
3458            last = &t->link, last_tso = t, t = t->link) {
3459         if (t == tso) {
3460           *last = tso->link;
3461           if (mvar->tail == tso) {
3462             mvar->tail = last_tso;
3463           }
3464           goto done;
3465         }
3466       }
3467       barf("unblockThread (MVAR): TSO not found");
3468     }
3469
3470   case BlockedOnBlackHole:
3471     {
3472       last = &blackhole_queue;
3473       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3474            last = &t->link, t = t->link) {
3475         if (t == tso) {
3476           *last = tso->link;
3477           goto done;
3478         }
3479       }
3480       barf("unblockThread (BLACKHOLE): TSO not found");
3481     }
3482
3483   case BlockedOnException:
3484     {
3485       StgTSO *target  = tso->block_info.tso;
3486
3487       ASSERT(get_itbl(target)->type == TSO);
3488
3489       while (target->what_next == ThreadRelocated) {
3490           target = target->link;
3491           ASSERT(get_itbl(target)->type == TSO);
3492       }
3493       
3494       ASSERT(target->blocked_exceptions != NULL);
3495
3496       last = &target->blocked_exceptions;
3497       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3498            last = &t->link, t = t->link) {
3499         ASSERT(get_itbl(t)->type == TSO);
3500         if (t == tso) {
3501           *last = tso->link;
3502           goto done;
3503         }
3504       }
3505       barf("unblockThread (Exception): TSO not found");
3506     }
3507
3508   case BlockedOnRead:
3509   case BlockedOnWrite:
3510 #if defined(mingw32_HOST_OS)
3511   case BlockedOnDoProc:
3512 #endif
3513     {
3514       StgTSO *prev = NULL;
3515       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3516            prev = t, t = t->link) {
3517         if (t == tso) {
3518           if (prev == NULL) {
3519             blocked_queue_hd = t->link;
3520             if (blocked_queue_tl == t) {
3521               blocked_queue_tl = END_TSO_QUEUE;
3522             }
3523           } else {
3524             prev->link = t->link;
3525             if (blocked_queue_tl == t) {
3526               blocked_queue_tl = prev;
3527             }
3528           }
3529 #if defined(mingw32_HOST_OS)
3530           /* (Cooperatively) signal that the worker thread should abort
3531            * the request.
3532            */
3533           abandonWorkRequest(tso->block_info.async_result->reqID);
3534 #endif
3535           goto done;
3536         }
3537       }
3538       barf("unblockThread (I/O): TSO not found");
3539     }
3540
3541   case BlockedOnDelay:
3542     {
3543       StgTSO *prev = NULL;
3544       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3545            prev = t, t = t->link) {
3546         if (t == tso) {
3547           if (prev == NULL) {
3548             sleeping_queue = t->link;
3549           } else {
3550             prev->link = t->link;
3551           }
3552           goto done;
3553         }
3554       }
3555       barf("unblockThread (delay): TSO not found");
3556     }
3557
3558   default:
3559     barf("unblockThread");
3560   }
3561
3562  done:
3563   tso->link = END_TSO_QUEUE;
3564   tso->why_blocked = NotBlocked;
3565   tso->block_info.closure = NULL;
3566   APPEND_TO_RUN_QUEUE(tso);
3567 }
3568 #endif
3569
3570 /* -----------------------------------------------------------------------------
3571  * checkBlackHoles()
3572  *
3573  * Check the blackhole_queue for threads that can be woken up.  We do
3574  * this periodically: before every GC, and whenever the run queue is
3575  * empty.
3576  *
3577  * An elegant solution might be to just wake up all the blocked
3578  * threads with awakenBlockedQueue occasionally: they'll go back to
3579  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3580  * doesn't give us a way to tell whether we've actually managed to
3581  * wake up any threads, so we would be busy-waiting.
3582  *
3583  * -------------------------------------------------------------------------- */
3584
3585 static rtsBool
3586 checkBlackHoles( void )
3587 {
3588     StgTSO **prev, *t;
3589     rtsBool any_woke_up = rtsFalse;
3590     StgHalfWord type;
3591
3592     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3593
3594     // ASSUMES: sched_mutex
3595     prev = &blackhole_queue;
3596     t = blackhole_queue;
3597     while (t != END_TSO_QUEUE) {
3598         ASSERT(t->why_blocked == BlockedOnBlackHole);
3599         type = get_itbl(t->block_info.closure)->type;
3600         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3601             t = unblockOneLocked(t);
3602             *prev = t;
3603             any_woke_up = rtsTrue;
3604         } else {
3605             prev = &t->link;
3606             t = t->link;
3607         }
3608     }
3609
3610     return any_woke_up;
3611 }
3612
3613 /* -----------------------------------------------------------------------------
3614  * raiseAsync()
3615  *
3616  * The following function implements the magic for raising an
3617  * asynchronous exception in an existing thread.
3618  *
3619  * We first remove the thread from any queue on which it might be
3620  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3621  *
3622  * We strip the stack down to the innermost CATCH_FRAME, building
3623  * thunks in the heap for all the active computations, so they can 
3624  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3625  * an application of the handler to the exception, and push it on
3626  * the top of the stack.
3627  * 
3628  * How exactly do we save all the active computations?  We create an
3629  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3630  * AP_STACKs pushes everything from the corresponding update frame
3631  * upwards onto the stack.  (Actually, it pushes everything up to the
3632  * next update frame plus a pointer to the next AP_STACK object.
3633  * Entering the next AP_STACK object pushes more onto the stack until we
3634  * reach the last AP_STACK object - at which point the stack should look
3635  * exactly as it did when we killed the TSO and we can continue
3636  * execution by entering the closure on top of the stack.
3637  *
3638  * We can also kill a thread entirely - this happens if either (a) the 
3639  * exception passed to raiseAsync is NULL, or (b) there's no
3640  * CATCH_FRAME on the stack.  In either case, we strip the entire
3641  * stack and replace the thread with a zombie.
3642  *
3643  * Locks: sched_mutex held upon entry nor exit.
3644  *
3645  * -------------------------------------------------------------------------- */
3646  
3647 void 
3648 deleteThread(StgTSO *tso)
3649 {
3650   if (tso->why_blocked != BlockedOnCCall &&
3651       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3652       raiseAsync(tso,NULL);
3653   }
3654 }
3655
3656 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3657 static void 
3658 deleteThreadImmediately(StgTSO *tso)
3659 { // for forkProcess only:
3660   // delete thread without giving it a chance to catch the KillThread exception
3661
3662   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3663       return;
3664   }
3665
3666   if (tso->why_blocked != BlockedOnCCall &&
3667       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3668     unblockThread(tso);
3669   }
3670
3671   tso->what_next = ThreadKilled;
3672 }
3673 #endif
3674
3675 void
3676 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3677 {
3678   /* When raising async exs from contexts where sched_mutex isn't held;
3679      use raiseAsyncWithLock(). */
3680   ACQUIRE_LOCK(&sched_mutex);
3681   raiseAsync(tso,exception);
3682   RELEASE_LOCK(&sched_mutex);
3683 }
3684
3685 void
3686 raiseAsync(StgTSO *tso, StgClosure *exception)
3687 {
3688     raiseAsync_(tso, exception, rtsFalse);
3689 }
3690
3691 static void
3692 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3693 {
3694     StgRetInfoTable *info;
3695     StgPtr sp;
3696   
3697     // Thread already dead?
3698     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3699         return;
3700     }
3701
3702     IF_DEBUG(scheduler, 
3703              sched_belch("raising exception in thread %ld.", (long)tso->id));
3704     
3705     // Remove it from any blocking queues
3706     unblockThread(tso);
3707
3708     sp = tso->sp;
3709     
3710     // The stack freezing code assumes there's a closure pointer on
3711     // the top of the stack, so we have to arrange that this is the case...
3712     //
3713     if (sp[0] == (W_)&stg_enter_info) {
3714         sp++;
3715     } else {
3716         sp--;
3717         sp[0] = (W_)&stg_dummy_ret_closure;
3718     }
3719
3720     while (1) {
3721         nat i;
3722
3723         // 1. Let the top of the stack be the "current closure"
3724         //
3725         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3726         // CATCH_FRAME.
3727         //
3728         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3729         // current closure applied to the chunk of stack up to (but not
3730         // including) the update frame.  This closure becomes the "current
3731         // closure".  Go back to step 2.
3732         //
3733         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3734         // top of the stack applied to the exception.
3735         // 
3736         // 5. If it's a STOP_FRAME, then kill the thread.
3737         // 
3738         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3739         // transaction
3740        
3741         
3742         StgPtr frame;
3743         
3744         frame = sp + 1;
3745         info = get_ret_itbl((StgClosure *)frame);
3746         
3747         while (info->i.type != UPDATE_FRAME
3748                && (info->i.type != CATCH_FRAME || exception == NULL)
3749                && info->i.type != STOP_FRAME
3750                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3751         {
3752             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3753               // IF we find an ATOMICALLY_FRAME then we abort the
3754               // current transaction and propagate the exception.  In
3755               // this case (unlike ordinary exceptions) we do not care
3756               // whether the transaction is valid or not because its
3757               // possible validity cannot have caused the exception
3758               // and will not be visible after the abort.
3759               IF_DEBUG(stm,
3760                        debugBelch("Found atomically block delivering async exception\n"));
3761               stmAbortTransaction(tso -> trec);
3762               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3763             }
3764             frame += stack_frame_sizeW((StgClosure *)frame);
3765             info = get_ret_itbl((StgClosure *)frame);
3766         }
3767         
3768         switch (info->i.type) {
3769             
3770         case ATOMICALLY_FRAME:
3771             ASSERT(stop_at_atomically);
3772             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3773             stmCondemnTransaction(tso -> trec);
3774 #ifdef REG_R1
3775             tso->sp = frame;
3776 #else
3777             // R1 is not a register: the return convention for IO in
3778             // this case puts the return value on the stack, so we
3779             // need to set up the stack to return to the atomically
3780             // frame properly...
3781             tso->sp = frame - 2;
3782             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3783             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3784 #endif
3785             tso->what_next = ThreadRunGHC;
3786             return;
3787
3788         case CATCH_FRAME:
3789             // If we find a CATCH_FRAME, and we've got an exception to raise,
3790             // then build the THUNK raise(exception), and leave it on
3791             // top of the CATCH_FRAME ready to enter.
3792             //
3793         {
3794 #ifdef PROFILING
3795             StgCatchFrame *cf = (StgCatchFrame *)frame;
3796 #endif
3797             StgThunk *raise;
3798             
3799             // we've got an exception to raise, so let's pass it to the
3800             // handler in this frame.
3801             //
3802             raise = (StgThunk *)allocate(sizeofW(StgThunk)+1);
3803             TICK_ALLOC_SE_THK(1,0);
3804             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3805             raise->payload[0] = exception;
3806             
3807             // throw away the stack from Sp up to the CATCH_FRAME.
3808             //
3809             sp = frame - 1;
3810             
3811             /* Ensure that async excpetions are blocked now, so we don't get
3812              * a surprise exception before we get around to executing the
3813              * handler.
3814              */
3815             if (tso->blocked_exceptions == NULL) {
3816                 tso->blocked_exceptions = END_TSO_QUEUE;
3817             }
3818             
3819             /* Put the newly-built THUNK on top of the stack, ready to execute
3820              * when the thread restarts.
3821              */
3822             sp[0] = (W_)raise;
3823             sp[-1] = (W_)&stg_enter_info;
3824             tso->sp = sp-1;
3825             tso->what_next = ThreadRunGHC;
3826             IF_DEBUG(sanity, checkTSO(tso));
3827             return;
3828         }
3829         
3830         case UPDATE_FRAME:
3831         {
3832             StgAP_STACK * ap;
3833             nat words;
3834             
3835             // First build an AP_STACK consisting of the stack chunk above the
3836             // current update frame, with the top word on the stack as the
3837             // fun field.
3838             //
3839             words = frame - sp - 1;
3840             ap = (StgAP_STACK *)allocate(AP_STACK_sizeW(words));
3841             
3842             ap->size = words;
3843             ap->fun  = (StgClosure *)sp[0];
3844             sp++;
3845             for(i=0; i < (nat)words; ++i) {
3846                 ap->payload[i] = (StgClosure *)*sp++;
3847             }
3848             
3849             SET_HDR(ap,&stg_AP_STACK_info,
3850                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3851             TICK_ALLOC_UP_THK(words+1,0);
3852             
3853             IF_DEBUG(scheduler,
3854                      debugBelch("sched: Updating ");
3855                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3856                      debugBelch(" with ");
3857                      printObj((StgClosure *)ap);
3858                 );
3859
3860             // Replace the updatee with an indirection - happily
3861             // this will also wake up any threads currently
3862             // waiting on the result.
3863             //
3864             // Warning: if we're in a loop, more than one update frame on
3865             // the stack may point to the same object.  Be careful not to
3866             // overwrite an IND_OLDGEN in this case, because we'll screw
3867             // up the mutable lists.  To be on the safe side, don't
3868             // overwrite any kind of indirection at all.  See also
3869             // threadSqueezeStack in GC.c, where we have to make a similar
3870             // check.
3871             //
3872             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3873                 // revert the black hole
3874                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3875                                (StgClosure *)ap);
3876             }
3877             sp += sizeofW(StgUpdateFrame) - 1;
3878             sp[0] = (W_)ap; // push onto stack
3879             break;
3880         }
3881         
3882         case STOP_FRAME:
3883             // We've stripped the entire stack, the thread is now dead.
3884             sp += sizeofW(StgStopFrame);
3885             tso->what_next = ThreadKilled;
3886             tso->sp = sp;
3887             return;
3888             
3889         default:
3890             barf("raiseAsync");
3891         }
3892     }
3893     barf("raiseAsync");
3894 }
3895
3896 /* -----------------------------------------------------------------------------
3897    raiseExceptionHelper
3898    
3899    This function is called by the raise# primitve, just so that we can
3900    move some of the tricky bits of raising an exception from C-- into
3901    C.  Who knows, it might be a useful re-useable thing here too.
3902    -------------------------------------------------------------------------- */
3903
3904 StgWord
3905 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3906 {
3907     StgThunk *raise_closure = NULL;
3908     StgPtr p, next;
3909     StgRetInfoTable *info;
3910     //
3911     // This closure represents the expression 'raise# E' where E
3912     // is the exception raise.  It is used to overwrite all the
3913     // thunks which are currently under evaluataion.
3914     //
3915
3916     //    
3917     // LDV profiling: stg_raise_info has THUNK as its closure
3918     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3919     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3920     // 1 does not cause any problem unless profiling is performed.
3921     // However, when LDV profiling goes on, we need to linearly scan
3922     // small object pool, where raise_closure is stored, so we should
3923     // use MIN_UPD_SIZE.
3924     //
3925     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3926     //                                 sizeofW(StgClosure)+1);
3927     //
3928
3929     //
3930     // Walk up the stack, looking for the catch frame.  On the way,
3931     // we update any closures pointed to from update frames with the
3932     // raise closure that we just built.
3933     //
3934     p = tso->sp;
3935     while(1) {
3936         info = get_ret_itbl((StgClosure *)p);
3937         next = p + stack_frame_sizeW((StgClosure *)p);
3938         switch (info->i.type) {
3939             
3940         case UPDATE_FRAME:
3941             // Only create raise_closure if we need to.
3942             if (raise_closure == NULL) {
3943                 raise_closure = 
3944                     (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE);
3945                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3946                 raise_closure->payload[0] = exception;
3947             }
3948             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3949             p = next;
3950             continue;
3951
3952         case ATOMICALLY_FRAME:
3953             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3954             tso->sp = p;
3955             return ATOMICALLY_FRAME;
3956             
3957         case CATCH_FRAME:
3958             tso->sp = p;
3959             return CATCH_FRAME;
3960
3961         case CATCH_STM_FRAME:
3962             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3963             tso->sp = p;
3964             return CATCH_STM_FRAME;
3965             
3966         case STOP_FRAME:
3967             tso->sp = p;
3968             return STOP_FRAME;
3969
3970         case CATCH_RETRY_FRAME:
3971         default:
3972             p = next; 
3973             continue;
3974         }
3975     }
3976 }
3977
3978
3979 /* -----------------------------------------------------------------------------
3980    findRetryFrameHelper
3981
3982    This function is called by the retry# primitive.  It traverses the stack
3983    leaving tso->sp referring to the frame which should handle the retry.  
3984
3985    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3986    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3987
3988    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3989    despite the similar implementation.
3990
3991    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3992    not be created within memory transactions.
3993    -------------------------------------------------------------------------- */
3994
3995 StgWord
3996 findRetryFrameHelper (StgTSO *tso)
3997 {
3998   StgPtr           p, next;
3999   StgRetInfoTable *info;
4000
4001   p = tso -> sp;
4002   while (1) {
4003     info = get_ret_itbl((StgClosure *)p);
4004     next = p + stack_frame_sizeW((StgClosure *)p);
4005     switch (info->i.type) {
4006       
4007     case ATOMICALLY_FRAME:
4008       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4009       tso->sp = p;
4010       return ATOMICALLY_FRAME;
4011       
4012     case CATCH_RETRY_FRAME:
4013       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4014       tso->sp = p;
4015       return CATCH_RETRY_FRAME;
4016       
4017     case CATCH_STM_FRAME:
4018     default:
4019       ASSERT(info->i.type != CATCH_FRAME);
4020       ASSERT(info->i.type != STOP_FRAME);
4021       p = next; 
4022       continue;
4023     }
4024   }
4025 }
4026
4027 /* -----------------------------------------------------------------------------
4028    resurrectThreads is called after garbage collection on the list of
4029    threads found to be garbage.  Each of these threads will be woken
4030    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4031    on an MVar, or NonTermination if the thread was blocked on a Black
4032    Hole.
4033
4034    Locks: sched_mutex isn't held upon entry nor exit.
4035    -------------------------------------------------------------------------- */
4036
4037 void
4038 resurrectThreads( StgTSO *threads )
4039 {
4040   StgTSO *tso, *next;
4041
4042   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4043     next = tso->global_link;
4044     tso->global_link = all_threads;
4045     all_threads = tso;
4046     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4047
4048     switch (tso->why_blocked) {
4049     case BlockedOnMVar:
4050     case BlockedOnException:
4051       /* Called by GC - sched_mutex lock is currently held. */
4052       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
4053       break;
4054     case BlockedOnBlackHole:
4055       raiseAsync(tso,(StgClosure *)NonTermination_closure);
4056       break;
4057     case BlockedOnSTM:
4058       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
4059       break;
4060     case NotBlocked:
4061       /* This might happen if the thread was blocked on a black hole
4062        * belonging to a thread that we've just woken up (raiseAsync
4063        * can wake up threads, remember...).
4064        */
4065       continue;
4066     default:
4067       barf("resurrectThreads: thread blocked in a strange way");
4068     }
4069   }
4070 }
4071
4072 /* ----------------------------------------------------------------------------
4073  * Debugging: why is a thread blocked
4074  * [Also provides useful information when debugging threaded programs
4075  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4076    ------------------------------------------------------------------------- */
4077
4078 static void
4079 printThreadBlockage(StgTSO *tso)
4080 {
4081   switch (tso->why_blocked) {
4082   case BlockedOnRead:
4083     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4084     break;
4085   case BlockedOnWrite:
4086     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4087     break;
4088 #if defined(mingw32_HOST_OS)
4089     case BlockedOnDoProc:
4090     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4091     break;
4092 #endif
4093   case BlockedOnDelay:
4094     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4095     break;
4096   case BlockedOnMVar:
4097     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4098     break;
4099   case BlockedOnException:
4100     debugBelch("is blocked on delivering an exception to thread %d",
4101             tso->block_info.tso->id);
4102     break;
4103   case BlockedOnBlackHole:
4104     debugBelch("is blocked on a black hole");
4105     break;
4106   case NotBlocked:
4107     debugBelch("is not blocked");
4108     break;
4109 #if defined(PARALLEL_HASKELL)
4110   case BlockedOnGA:
4111     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4112             tso->block_info.closure, info_type(tso->block_info.closure));
4113     break;
4114   case BlockedOnGA_NoSend:
4115     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4116             tso->block_info.closure, info_type(tso->block_info.closure));
4117     break;
4118 #endif
4119   case BlockedOnCCall:
4120     debugBelch("is blocked on an external call");
4121     break;
4122   case BlockedOnCCall_NoUnblockExc:
4123     debugBelch("is blocked on an external call (exceptions were already blocked)");
4124     break;
4125   case BlockedOnSTM:
4126     debugBelch("is blocked on an STM operation");
4127     break;
4128   default:
4129     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4130          tso->why_blocked, tso->id, tso);
4131   }
4132 }
4133
4134 static void
4135 printThreadStatus(StgTSO *tso)
4136 {
4137   switch (tso->what_next) {
4138   case ThreadKilled:
4139     debugBelch("has been killed");
4140     break;
4141   case ThreadComplete:
4142     debugBelch("has completed");
4143     break;
4144   default:
4145     printThreadBlockage(tso);
4146   }
4147 }
4148
4149 void
4150 printAllThreads(void)
4151 {
4152   StgTSO *t;
4153
4154 # if defined(GRAN)
4155   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4156   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4157                        time_string, rtsFalse/*no commas!*/);
4158
4159   debugBelch("all threads at [%s]:\n", time_string);
4160 # elif defined(PARALLEL_HASKELL)
4161   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4162   ullong_format_string(CURRENT_TIME,
4163                        time_string, rtsFalse/*no commas!*/);
4164
4165   debugBelch("all threads at [%s]:\n", time_string);
4166 # else
4167   debugBelch("all threads:\n");
4168 # endif
4169
4170   for (t = all_threads; t != END_TSO_QUEUE; ) {
4171     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4172 #if defined(DEBUG)
4173     {
4174       void *label = lookupThreadLabel(t->id);
4175       if (label) debugBelch("[\"%s\"] ",(char *)label);
4176     }
4177 #endif
4178     if (t->what_next == ThreadRelocated) {
4179         debugBelch("has been relocated...\n");
4180         t = t->link;
4181     } else {
4182         printThreadStatus(t);
4183         debugBelch("\n");
4184         t = t->global_link;
4185     }
4186   }
4187 }
4188
4189 #ifdef DEBUG
4190
4191 // useful from gdb
4192 void 
4193 printThreadQueue(StgTSO *t)
4194 {
4195     nat i = 0;
4196     for (; t != END_TSO_QUEUE; t = t->link) {
4197         debugBelch("\tthread %d @ %p ", t->id, (void *)t);
4198         if (t->what_next == ThreadRelocated) {
4199             debugBelch("has been relocated...\n");
4200         } else {
4201             printThreadStatus(t);
4202             debugBelch("\n");
4203         }
4204         i++;
4205     }
4206     debugBelch("%d threads on queue\n", i);
4207 }
4208
4209 /* 
4210    Print a whole blocking queue attached to node (debugging only).
4211 */
4212 # if defined(PARALLEL_HASKELL)
4213 void 
4214 print_bq (StgClosure *node)
4215 {
4216   StgBlockingQueueElement *bqe;
4217   StgTSO *tso;
4218   rtsBool end;
4219
4220   debugBelch("## BQ of closure %p (%s): ",
4221           node, info_type(node));
4222
4223   /* should cover all closures that may have a blocking queue */
4224   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4225          get_itbl(node)->type == FETCH_ME_BQ ||
4226          get_itbl(node)->type == RBH ||
4227          get_itbl(node)->type == MVAR);
4228     
4229   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4230
4231   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4232 }
4233
4234 /* 
4235    Print a whole blocking queue starting with the element bqe.
4236 */
4237 void 
4238 print_bqe (StgBlockingQueueElement *bqe)
4239 {
4240   rtsBool end;
4241
4242   /* 
4243      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4244   */
4245   for (end = (bqe==END_BQ_QUEUE);
4246        !end; // iterate until bqe points to a CONSTR
4247        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4248        bqe = end ? END_BQ_QUEUE : bqe->link) {
4249     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4250     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4251     /* types of closures that may appear in a blocking queue */
4252     ASSERT(get_itbl(bqe)->type == TSO ||           
4253            get_itbl(bqe)->type == BLOCKED_FETCH || 
4254            get_itbl(bqe)->type == CONSTR); 
4255     /* only BQs of an RBH end with an RBH_Save closure */
4256     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4257
4258     switch (get_itbl(bqe)->type) {
4259     case TSO:
4260       debugBelch(" TSO %u (%x),",
4261               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4262       break;
4263     case BLOCKED_FETCH:
4264       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4265               ((StgBlockedFetch *)bqe)->node, 
4266               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4267               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4268               ((StgBlockedFetch *)bqe)->ga.weight);
4269       break;
4270     case CONSTR:
4271       debugBelch(" %s (IP %p),",
4272               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4273                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4274                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4275                "RBH_Save_?"), get_itbl(bqe));
4276       break;
4277     default:
4278       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4279            info_type((StgClosure *)bqe)); // , node, info_type(node));
4280       break;
4281     }
4282   } /* for */
4283   debugBelch("\n");
4284 }
4285 # elif defined(GRAN)
4286 void 
4287 print_bq (StgClosure *node)
4288 {
4289   StgBlockingQueueElement *bqe;
4290   PEs node_loc, tso_loc;
4291   rtsBool end;
4292
4293   /* should cover all closures that may have a blocking queue */
4294   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4295          get_itbl(node)->type == FETCH_ME_BQ ||
4296          get_itbl(node)->type == RBH);
4297     
4298   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4299   node_loc = where_is(node);
4300
4301   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4302           node, info_type(node), node_loc);
4303
4304   /* 
4305      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4306   */
4307   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4308        !end; // iterate until bqe points to a CONSTR
4309        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4310     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4311     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4312     /* types of closures that may appear in a blocking queue */
4313     ASSERT(get_itbl(bqe)->type == TSO ||           
4314            get_itbl(bqe)->type == CONSTR); 
4315     /* only BQs of an RBH end with an RBH_Save closure */
4316     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4317
4318     tso_loc = where_is((StgClosure *)bqe);
4319     switch (get_itbl(bqe)->type) {
4320     case TSO:
4321       debugBelch(" TSO %d (%p) on [PE %d],",
4322               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4323       break;
4324     case CONSTR:
4325       debugBelch(" %s (IP %p),",
4326               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4327                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4328                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4329                "RBH_Save_?"), get_itbl(bqe));
4330       break;
4331     default:
4332       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4333            info_type((StgClosure *)bqe), node, info_type(node));
4334       break;
4335     }
4336   } /* for */
4337   debugBelch("\n");
4338 }
4339 # endif
4340
4341 #if defined(PARALLEL_HASKELL)
4342 static nat
4343 run_queue_len(void)
4344 {
4345   nat i;
4346   StgTSO *tso;
4347
4348   for (i=0, tso=run_queue_hd; 
4349        tso != END_TSO_QUEUE;
4350        i++, tso=tso->link)
4351     /* nothing */
4352
4353   return i;
4354 }
4355 #endif
4356
4357 void
4358 sched_belch(char *s, ...)
4359 {
4360   va_list ap;
4361   va_start(ap,s);
4362 #ifdef RTS_SUPPORTS_THREADS
4363   debugBelch("sched (task %p): ", osThreadId());
4364 #elif defined(PARALLEL_HASKELL)
4365   debugBelch("== ");
4366 #else
4367   debugBelch("sched: ");
4368 #endif
4369   vdebugBelch(s, ap);
4370   debugBelch("\n");
4371   va_end(ap);
4372 }
4373
4374 #endif /* DEBUG */