[project @ 2005-04-12 09:04:23 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PARALLEL_HASKELL          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "OSThreads.h"
46 #include "Storage.h"
47 #include "StgRun.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PARALLEL_HASKELL)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 // Turn off inlining when debugging - it obfuscates things
97 #ifdef DEBUG
98 # undef  STATIC_INLINE
99 # define STATIC_INLINE static
100 #endif
101
102 #ifdef THREADED_RTS
103 #define USED_IN_THREADED_RTS
104 #else
105 #define USED_IN_THREADED_RTS STG_UNUSED
106 #endif
107
108 #ifdef RTS_SUPPORTS_THREADS
109 #define USED_WHEN_RTS_SUPPORTS_THREADS
110 #else
111 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
112 #endif
113
114 /* Main thread queue.
115  * Locks required: sched_mutex.
116  */
117 StgMainThread *main_threads = NULL;
118
119 #if defined(GRAN)
120
121 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
122 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
123
124 /* 
125    In GranSim we have a runnable and a blocked queue for each processor.
126    In order to minimise code changes new arrays run_queue_hds/tls
127    are created. run_queue_hd is then a short cut (macro) for
128    run_queue_hds[CurrentProc] (see GranSim.h).
129    -- HWL
130 */
131 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
132 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
133 StgTSO *ccalling_threadss[MAX_PROC];
134 /* We use the same global list of threads (all_threads) in GranSim as in
135    the std RTS (i.e. we are cheating). However, we don't use this list in
136    the GranSim specific code at the moment (so we are only potentially
137    cheating).  */
138
139 #else /* !GRAN */
140
141 /* Thread queues.
142  * Locks required: sched_mutex.
143  */
144 StgTSO *run_queue_hd = NULL;
145 StgTSO *run_queue_tl = NULL;
146 StgTSO *blocked_queue_hd = NULL;
147 StgTSO *blocked_queue_tl = NULL;
148 StgTSO *blackhole_queue = NULL;
149 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
150
151 #endif
152
153 /* The blackhole_queue should be checked for threads to wake up.  See
154  * Schedule.h for more thorough comment.
155  */
156 rtsBool blackholes_need_checking = rtsFalse;
157
158 /* Linked list of all threads.
159  * Used for detecting garbage collected threads.
160  */
161 StgTSO *all_threads = NULL;
162
163 /* When a thread performs a safe C call (_ccall_GC, using old
164  * terminology), it gets put on the suspended_ccalling_threads
165  * list. Used by the garbage collector.
166  */
167 static StgTSO *suspended_ccalling_threads;
168
169 /* KH: The following two flags are shared memory locations.  There is no need
170        to lock them, since they are only unset at the end of a scheduler
171        operation.
172 */
173
174 /* flag set by signal handler to precipitate a context switch */
175 int context_switch = 0;
176
177 /* if this flag is set as well, give up execution */
178 rtsBool interrupted = rtsFalse;
179
180 /* If this flag is set, we are running Haskell code.  Used to detect
181  * uses of 'foreign import unsafe' that should be 'safe'.
182  */
183 static rtsBool in_haskell = rtsFalse;
184
185 /* Next thread ID to allocate.
186  * Locks required: thread_id_mutex
187  */
188 static StgThreadID next_thread_id = 1;
189
190 /*
191  * Pointers to the state of the current thread.
192  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
193  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
194  */
195  
196 /* The smallest stack size that makes any sense is:
197  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
198  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
199  *  + 1                       (the closure to enter)
200  *  + 1                       (stg_ap_v_ret)
201  *  + 1                       (spare slot req'd by stg_ap_v_ret)
202  *
203  * A thread with this stack will bomb immediately with a stack
204  * overflow, which will increase its stack size.  
205  */
206
207 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
208
209
210 #if defined(GRAN)
211 StgTSO *CurrentTSO;
212 #endif
213
214 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
215  *  exists - earlier gccs apparently didn't.
216  *  -= chak
217  */
218 StgTSO dummy_tso;
219
220 /*
221  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
222  * in an MT setting, needed to signal that a worker thread shouldn't hang around
223  * in the scheduler when it is out of work.
224  */
225 static rtsBool shutting_down_scheduler = rtsFalse;
226
227 #if defined(RTS_SUPPORTS_THREADS)
228 /* ToDo: carefully document the invariants that go together
229  *       with these synchronisation objects.
230  */
231 Mutex     sched_mutex       = INIT_MUTEX_VAR;
232 Mutex     term_mutex        = INIT_MUTEX_VAR;
233
234 #endif /* RTS_SUPPORTS_THREADS */
235
236 #if defined(PARALLEL_HASKELL)
237 StgTSO *LastTSO;
238 rtsTime TimeOfLastYield;
239 rtsBool emitSchedule = rtsTrue;
240 #endif
241
242 #if DEBUG
243 static char *whatNext_strs[] = {
244   "(unknown)",
245   "ThreadRunGHC",
246   "ThreadInterpret",
247   "ThreadKilled",
248   "ThreadRelocated",
249   "ThreadComplete"
250 };
251 #endif
252
253 /* -----------------------------------------------------------------------------
254  * static function prototypes
255  * -------------------------------------------------------------------------- */
256
257 #if defined(RTS_SUPPORTS_THREADS)
258 static void taskStart(void);
259 #endif
260
261 static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
262                       Capability *initialCapability );
263
264 //
265 // These function all encapsulate parts of the scheduler loop, and are
266 // abstracted only to make the structure and control flow of the
267 // scheduler clearer.
268 //
269 static void schedulePreLoop(void);
270 static void scheduleStartSignalHandlers(void);
271 static void scheduleCheckBlockedThreads(void);
272 static void scheduleCheckBlackHoles(void);
273 static void scheduleDetectDeadlock(void);
274 #if defined(GRAN)
275 static StgTSO *scheduleProcessEvent(rtsEvent *event);
276 #endif
277 #if defined(PARALLEL_HASKELL)
278 static StgTSO *scheduleSendPendingMessages(void);
279 static void scheduleActivateSpark(void);
280 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
281 #endif
282 #if defined(PAR) || defined(GRAN)
283 static void scheduleGranParReport(void);
284 #endif
285 static void schedulePostRunThread(void);
286 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
287 static void scheduleHandleStackOverflow( StgTSO *t);
288 static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
289 static void scheduleHandleThreadBlocked( StgTSO *t );
290 static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, 
291                                              Capability *cap, StgTSO *t );
292 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
293 static void scheduleDoGC(Capability *cap);
294
295 static void unblockThread(StgTSO *tso);
296 static rtsBool checkBlackHoles(void);
297 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
298                                    Capability *initialCapability
299                                    );
300 static void scheduleThread_ (StgTSO* tso);
301 static void AllRoots(evac_fn evac);
302
303 static StgTSO *threadStackOverflow(StgTSO *tso);
304
305 static void raiseAsync_(StgTSO *tso, StgClosure *exception, 
306                         rtsBool stop_at_atomically);
307
308 static void printThreadBlockage(StgTSO *tso);
309 static void printThreadStatus(StgTSO *tso);
310
311 #if defined(PARALLEL_HASKELL)
312 StgTSO * createSparkThread(rtsSpark spark);
313 StgTSO * activateSpark (rtsSpark spark);  
314 #endif
315
316 /* ----------------------------------------------------------------------------
317  * Starting Tasks
318  * ------------------------------------------------------------------------- */
319
320 #if defined(RTS_SUPPORTS_THREADS)
321 static rtsBool startingWorkerThread = rtsFalse;
322
323 static void
324 taskStart(void)
325 {
326   ACQUIRE_LOCK(&sched_mutex);
327   startingWorkerThread = rtsFalse;
328   schedule(NULL,NULL);
329   taskStop();
330   RELEASE_LOCK(&sched_mutex);
331 }
332
333 void
334 startSchedulerTaskIfNecessary(void)
335 {
336     if ( !EMPTY_RUN_QUEUE()
337          && !shutting_down_scheduler // not if we're shutting down
338          && !startingWorkerThread)
339     {
340         // we don't want to start another worker thread
341         // just because the last one hasn't yet reached the
342         // "waiting for capability" state
343         startingWorkerThread = rtsTrue;
344         if (!maybeStartNewWorker(taskStart)) {
345             startingWorkerThread = rtsFalse;
346         }
347     }
348 }
349 #endif
350
351 /* -----------------------------------------------------------------------------
352  * Putting a thread on the run queue: different scheduling policies
353  * -------------------------------------------------------------------------- */
354
355 STATIC_INLINE void
356 addToRunQueue( StgTSO *t )
357 {
358 #if defined(PARALLEL_HASKELL)
359     if (RtsFlags.ParFlags.doFairScheduling) { 
360         // this does round-robin scheduling; good for concurrency
361         APPEND_TO_RUN_QUEUE(t);
362     } else {
363         // this does unfair scheduling; good for parallelism
364         PUSH_ON_RUN_QUEUE(t);
365     }
366 #else
367     // this does round-robin scheduling; good for concurrency
368     APPEND_TO_RUN_QUEUE(t);
369 #endif
370 }
371     
372 /* ---------------------------------------------------------------------------
373    Main scheduling loop.
374
375    We use round-robin scheduling, each thread returning to the
376    scheduler loop when one of these conditions is detected:
377
378       * out of heap space
379       * timer expires (thread yields)
380       * thread blocks
381       * thread ends
382       * stack overflow
383
384    Locking notes:  we acquire the scheduler lock once at the beginning
385    of the scheduler loop, and release it when
386     
387       * running a thread, or
388       * waiting for work, or
389       * waiting for a GC to complete.
390
391    GRAN version:
392      In a GranSim setup this loop iterates over the global event queue.
393      This revolves around the global event queue, which determines what 
394      to do next. Therefore, it's more complicated than either the 
395      concurrent or the parallel (GUM) setup.
396
397    GUM version:
398      GUM iterates over incoming messages.
399      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
400      and sends out a fish whenever it has nothing to do; in-between
401      doing the actual reductions (shared code below) it processes the
402      incoming messages and deals with delayed operations 
403      (see PendingFetches).
404      This is not the ugliest code you could imagine, but it's bloody close.
405
406    ------------------------------------------------------------------------ */
407
408 static void
409 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
410           Capability *initialCapability )
411 {
412   StgTSO *t;
413   Capability *cap;
414   StgThreadReturnCode ret;
415 #if defined(GRAN)
416   rtsEvent *event;
417 #elif defined(PARALLEL_HASKELL)
418   StgTSO *tso;
419   GlobalTaskId pe;
420   rtsBool receivedFinish = rtsFalse;
421 # if defined(DEBUG)
422   nat tp_size, sp_size; // stats only
423 # endif
424 #endif
425   nat prev_what_next;
426   rtsBool ready_to_gc;
427   
428   // Pre-condition: sched_mutex is held.
429   // We might have a capability, passed in as initialCapability.
430   cap = initialCapability;
431
432 #if !defined(RTS_SUPPORTS_THREADS)
433   // simply initialise it in the non-threaded case
434   grabCapability(&cap);
435 #endif
436
437   IF_DEBUG(scheduler,
438            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
439                        mainThread, initialCapability);
440       );
441
442   schedulePreLoop();
443
444   // -----------------------------------------------------------
445   // Scheduler loop starts here:
446
447 #if defined(PARALLEL_HASKELL)
448 #define TERMINATION_CONDITION        (!receivedFinish)
449 #elif defined(GRAN)
450 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
451 #else
452 #define TERMINATION_CONDITION        rtsTrue
453 #endif
454
455   while (TERMINATION_CONDITION) {
456
457 #if defined(GRAN)
458       /* Choose the processor with the next event */
459       CurrentProc = event->proc;
460       CurrentTSO = event->tso;
461 #endif
462
463       IF_DEBUG(scheduler, printAllThreads());
464
465 #if defined(RTS_SUPPORTS_THREADS)
466       // Yield the capability to higher-priority tasks if necessary.
467       //
468       if (cap != NULL) {
469           yieldCapability(&cap);
470       }
471
472       // If we do not currently hold a capability, we wait for one
473       //
474       if (cap == NULL) {
475           waitForCapability(&sched_mutex, &cap,
476                             mainThread ? &mainThread->bound_thread_cond : NULL);
477       }
478
479       // We now have a capability...
480 #endif
481
482     // Check whether we have re-entered the RTS from Haskell without
483     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
484     // call).
485     if (in_haskell) {
486           errorBelch("schedule: re-entered unsafely.\n"
487                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
488           stg_exit(1);
489     }
490
491     //
492     // Test for interruption.  If interrupted==rtsTrue, then either
493     // we received a keyboard interrupt (^C), or the scheduler is
494     // trying to shut down all the tasks (shutting_down_scheduler) in
495     // the threaded RTS.
496     //
497     if (interrupted) {
498         if (shutting_down_scheduler) {
499             IF_DEBUG(scheduler, sched_belch("shutting down"));
500             releaseCapability(cap);
501             if (mainThread) {
502                 mainThread->stat = Interrupted;
503                 mainThread->ret  = NULL;
504             }
505             return;
506         } else {
507             IF_DEBUG(scheduler, sched_belch("interrupted"));
508             deleteAllThreads();
509         }
510     }
511
512 #if defined(not_yet) && defined(SMP)
513     //
514     // Top up the run queue from our spark pool.  We try to make the
515     // number of threads in the run queue equal to the number of
516     // free capabilities.
517     //
518     {
519         StgClosure *spark;
520         if (EMPTY_RUN_QUEUE()) {
521             spark = findSpark(rtsFalse);
522             if (spark == NULL) {
523                 break; /* no more sparks in the pool */
524             } else {
525                 createSparkThread(spark);         
526                 IF_DEBUG(scheduler,
527                          sched_belch("==^^ turning spark of closure %p into a thread",
528                                      (StgClosure *)spark));
529             }
530         }
531     }
532 #endif // SMP
533
534     scheduleStartSignalHandlers();
535
536     // Only check the black holes here if we've nothing else to do.
537     // During normal execution, the black hole list only gets checked
538     // at GC time, to avoid repeatedly traversing this possibly long
539     // list each time around the scheduler.
540     if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); }
541
542     scheduleCheckBlockedThreads();
543
544     scheduleDetectDeadlock();
545
546     // Normally, the only way we can get here with no threads to
547     // run is if a keyboard interrupt received during 
548     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
549     // Additionally, it is not fatal for the
550     // threaded RTS to reach here with no threads to run.
551     //
552     // win32: might be here due to awaitEvent() being abandoned
553     // as a result of a console event having been delivered.
554     if ( EMPTY_RUN_QUEUE() ) {
555 #if !defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_HOST_OS)
556         ASSERT(interrupted);
557 #endif
558         continue; // nothing to do
559     }
560
561 #if defined(PARALLEL_HASKELL)
562     scheduleSendPendingMessages();
563     if (EMPTY_RUN_QUEUE() && scheduleActivateSpark()) 
564         continue;
565
566 #if defined(SPARKS)
567     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
568 #endif
569
570     /* If we still have no work we need to send a FISH to get a spark
571        from another PE */
572     if (EMPTY_RUN_QUEUE()) {
573         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
574         ASSERT(rtsFalse); // should not happen at the moment
575     }
576     // from here: non-empty run queue.
577     //  TODO: merge above case with this, only one call processMessages() !
578     if (PacketsWaiting()) {  /* process incoming messages, if
579                                 any pending...  only in else
580                                 because getRemoteWork waits for
581                                 messages as well */
582         receivedFinish = processMessages();
583     }
584 #endif
585
586 #if defined(GRAN)
587     scheduleProcessEvent(event);
588 #endif
589
590     // 
591     // Get a thread to run
592     //
593     ASSERT(run_queue_hd != END_TSO_QUEUE);
594     POP_RUN_QUEUE(t);
595
596 #if defined(GRAN) || defined(PAR)
597     scheduleGranParReport(); // some kind of debuging output
598 #else
599     // Sanity check the thread we're about to run.  This can be
600     // expensive if there is lots of thread switching going on...
601     IF_DEBUG(sanity,checkTSO(t));
602 #endif
603
604 #if defined(RTS_SUPPORTS_THREADS)
605     // Check whether we can run this thread in the current task.
606     // If not, we have to pass our capability to the right task.
607     {
608       StgMainThread *m = t->main;
609       
610       if(m)
611       {
612         if(m == mainThread)
613         {
614           IF_DEBUG(scheduler,
615             sched_belch("### Running thread %d in bound thread", t->id));
616           // yes, the Haskell thread is bound to the current native thread
617         }
618         else
619         {
620           IF_DEBUG(scheduler,
621             sched_belch("### thread %d bound to another OS thread", t->id));
622           // no, bound to a different Haskell thread: pass to that thread
623           PUSH_ON_RUN_QUEUE(t);
624           passCapability(&m->bound_thread_cond);
625           continue;
626         }
627       }
628       else
629       {
630         if(mainThread != NULL)
631         // The thread we want to run is bound.
632         {
633           IF_DEBUG(scheduler,
634             sched_belch("### this OS thread cannot run thread %d", t->id));
635           // no, the current native thread is bound to a different
636           // Haskell thread, so pass it to any worker thread
637           PUSH_ON_RUN_QUEUE(t);
638           passCapabilityToWorker();
639           continue; 
640         }
641       }
642     }
643 #endif
644
645     cap->r.rCurrentTSO = t;
646     
647     /* context switches are now initiated by the timer signal, unless
648      * the user specified "context switch as often as possible", with
649      * +RTS -C0
650      */
651     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
652          && (run_queue_hd != END_TSO_QUEUE
653              || blocked_queue_hd != END_TSO_QUEUE
654              || sleeping_queue != END_TSO_QUEUE)))
655         context_switch = 1;
656
657 run_thread:
658
659     RELEASE_LOCK(&sched_mutex);
660
661     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
662                               (long)t->id, whatNext_strs[t->what_next]));
663
664 #if defined(PROFILING)
665     startHeapProfTimer();
666 #endif
667
668     // ----------------------------------------------------------------------
669     // Run the current thread 
670
671     prev_what_next = t->what_next;
672
673     errno = t->saved_errno;
674     in_haskell = rtsTrue;
675
676     switch (prev_what_next) {
677
678     case ThreadKilled:
679     case ThreadComplete:
680         /* Thread already finished, return to scheduler. */
681         ret = ThreadFinished;
682         break;
683
684     case ThreadRunGHC:
685         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
686         break;
687
688     case ThreadInterpret:
689         ret = interpretBCO(cap);
690         break;
691
692     default:
693       barf("schedule: invalid what_next field");
694     }
695
696     // We have run some Haskell code: there might be blackhole-blocked
697     // threads to wake up now.
698     if ( blackhole_queue != END_TSO_QUEUE ) {
699         blackholes_need_checking = rtsTrue;
700     }
701
702     in_haskell = rtsFalse;
703
704     // The TSO might have moved, eg. if it re-entered the RTS and a GC
705     // happened.  So find the new location:
706     t = cap->r.rCurrentTSO;
707
708     // And save the current errno in this thread.
709     t->saved_errno = errno;
710
711     // ----------------------------------------------------------------------
712     
713     /* Costs for the scheduler are assigned to CCS_SYSTEM */
714 #if defined(PROFILING)
715     stopHeapProfTimer();
716     CCCS = CCS_SYSTEM;
717 #endif
718     
719     ACQUIRE_LOCK(&sched_mutex);
720     
721 #if defined(RTS_SUPPORTS_THREADS)
722     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
723 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
724     IF_DEBUG(scheduler,debugBelch("sched: "););
725 #endif
726     
727     schedulePostRunThread();
728
729     ready_to_gc = rtsFalse;
730
731     switch (ret) {
732     case HeapOverflow:
733         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
734         break;
735
736     case StackOverflow:
737         scheduleHandleStackOverflow(t);
738         break;
739
740     case ThreadYielding:
741         if (scheduleHandleYield(t, prev_what_next)) {
742             // shortcut for switching between compiler/interpreter:
743             goto run_thread; 
744         }
745         break;
746
747     case ThreadBlocked:
748         scheduleHandleThreadBlocked(t);
749         threadPaused(t);
750         break;
751
752     case ThreadFinished:
753         if (scheduleHandleThreadFinished(mainThread, cap, t)) return;;
754         break;
755
756     default:
757       barf("schedule: invalid thread return code %d", (int)ret);
758     }
759
760     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
761     if (ready_to_gc) { scheduleDoGC(cap); }
762   } /* end of while() */
763
764   IF_PAR_DEBUG(verbose,
765                debugBelch("== Leaving schedule() after having received Finish\n"));
766 }
767
768 /* ----------------------------------------------------------------------------
769  * Setting up the scheduler loop
770  * ASSUMES: sched_mutex
771  * ------------------------------------------------------------------------- */
772
773 static void
774 schedulePreLoop(void)
775 {
776 #if defined(GRAN) 
777     /* set up first event to get things going */
778     /* ToDo: assign costs for system setup and init MainTSO ! */
779     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
780               ContinueThread, 
781               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
782     
783     IF_DEBUG(gran,
784              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
785                         CurrentTSO);
786              G_TSO(CurrentTSO, 5));
787     
788     if (RtsFlags.GranFlags.Light) {
789         /* Save current time; GranSim Light only */
790         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
791     }      
792 #endif
793 }
794
795 /* ----------------------------------------------------------------------------
796  * Start any pending signal handlers
797  * ASSUMES: sched_mutex
798  * ------------------------------------------------------------------------- */
799
800 static void
801 scheduleStartSignalHandlers(void)
802 {
803 #if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
804     if (signals_pending()) {
805       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
806       startSignalHandlers();
807       ACQUIRE_LOCK(&sched_mutex);
808     }
809 #endif
810 }
811
812 /* ----------------------------------------------------------------------------
813  * Check for blocked threads that can be woken up.
814  * ASSUMES: sched_mutex
815  * ------------------------------------------------------------------------- */
816
817 static void
818 scheduleCheckBlockedThreads(void)
819 {
820     //
821     // Check whether any waiting threads need to be woken up.  If the
822     // run queue is empty, and there are no other tasks running, we
823     // can wait indefinitely for something to happen.
824     //
825     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
826     {
827 #if defined(RTS_SUPPORTS_THREADS)
828         // We shouldn't be here...
829         barf("schedule: awaitEvent() in threaded RTS");
830 #endif
831         awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
832     }
833 }
834
835
836 /* ----------------------------------------------------------------------------
837  * Check for threads blocked on BLACKHOLEs that can be woken up
838  * ASSUMES: sched_mutex
839  * ------------------------------------------------------------------------- */
840 static void
841 scheduleCheckBlackHoles( void )
842 {
843     if ( blackholes_need_checking )
844     {
845         checkBlackHoles();
846         blackholes_need_checking = rtsFalse;
847     }
848 }
849
850 /* ----------------------------------------------------------------------------
851  * Detect deadlock conditions and attempt to resolve them.
852  * ASSUMES: sched_mutex
853  * ------------------------------------------------------------------------- */
854
855 static void
856 scheduleDetectDeadlock(void)
857 {
858     /* 
859      * Detect deadlock: when we have no threads to run, there are no
860      * threads blocked, waiting for I/O, or sleeping, and all the
861      * other tasks are waiting for work, we must have a deadlock of
862      * some description.
863      */
864     if ( EMPTY_THREAD_QUEUES() )
865     {
866 #if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS)
867         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
868
869         // Garbage collection can release some new threads due to
870         // either (a) finalizers or (b) threads resurrected because
871         // they are unreachable and will therefore be sent an
872         // exception.  Any threads thus released will be immediately
873         // runnable.
874         GarbageCollect(GetRoots,rtsTrue);
875         if ( !EMPTY_RUN_QUEUE() ) return;
876
877 #if defined(RTS_USER_SIGNALS)
878         /* If we have user-installed signal handlers, then wait
879          * for signals to arrive rather then bombing out with a
880          * deadlock.
881          */
882         if ( anyUserHandlers() ) {
883             IF_DEBUG(scheduler, 
884                      sched_belch("still deadlocked, waiting for signals..."));
885
886             awaitUserSignals();
887
888             if (signals_pending()) {
889                 RELEASE_LOCK(&sched_mutex);
890                 startSignalHandlers();
891                 ACQUIRE_LOCK(&sched_mutex);
892             }
893
894             // either we have threads to run, or we were interrupted:
895             ASSERT(!EMPTY_RUN_QUEUE() || interrupted);
896         }
897 #endif
898
899         /* Probably a real deadlock.  Send the current main thread the
900          * Deadlock exception (or in the SMP build, send *all* main
901          * threads the deadlock exception, since none of them can make
902          * progress).
903          */
904         {
905             StgMainThread *m;
906             m = main_threads;
907             switch (m->tso->why_blocked) {
908             case BlockedOnBlackHole:
909             case BlockedOnException:
910             case BlockedOnMVar:
911                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
912                 return;
913             default:
914                 barf("deadlock: main thread blocked in a strange way");
915             }
916         }
917
918 #elif defined(RTS_SUPPORTS_THREADS)
919     // ToDo: add deadlock detection in threaded RTS
920 #elif defined(PARALLEL_HASKELL)
921     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
922 #endif
923     }
924 }
925
926 /* ----------------------------------------------------------------------------
927  * Process an event (GRAN only)
928  * ------------------------------------------------------------------------- */
929
930 #if defined(GRAN)
931 static StgTSO *
932 scheduleProcessEvent(rtsEvent *event)
933 {
934     StgTSO *t;
935
936     if (RtsFlags.GranFlags.Light)
937       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
938
939     /* adjust time based on time-stamp */
940     if (event->time > CurrentTime[CurrentProc] &&
941         event->evttype != ContinueThread)
942       CurrentTime[CurrentProc] = event->time;
943     
944     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
945     if (!RtsFlags.GranFlags.Light)
946       handleIdlePEs();
947
948     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
949
950     /* main event dispatcher in GranSim */
951     switch (event->evttype) {
952       /* Should just be continuing execution */
953     case ContinueThread:
954       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
955       /* ToDo: check assertion
956       ASSERT(run_queue_hd != (StgTSO*)NULL &&
957              run_queue_hd != END_TSO_QUEUE);
958       */
959       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
960       if (!RtsFlags.GranFlags.DoAsyncFetch &&
961           procStatus[CurrentProc]==Fetching) {
962         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
963               CurrentTSO->id, CurrentTSO, CurrentProc);
964         goto next_thread;
965       } 
966       /* Ignore ContinueThreads for completed threads */
967       if (CurrentTSO->what_next == ThreadComplete) {
968         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
969               CurrentTSO->id, CurrentTSO, CurrentProc);
970         goto next_thread;
971       } 
972       /* Ignore ContinueThreads for threads that are being migrated */
973       if (PROCS(CurrentTSO)==Nowhere) { 
974         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
975               CurrentTSO->id, CurrentTSO, CurrentProc);
976         goto next_thread;
977       }
978       /* The thread should be at the beginning of the run queue */
979       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
980         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
981               CurrentTSO->id, CurrentTSO, CurrentProc);
982         break; // run the thread anyway
983       }
984       /*
985       new_event(proc, proc, CurrentTime[proc],
986                 FindWork,
987                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
988       goto next_thread; 
989       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
990       break; // now actually run the thread; DaH Qu'vam yImuHbej 
991
992     case FetchNode:
993       do_the_fetchnode(event);
994       goto next_thread;             /* handle next event in event queue  */
995       
996     case GlobalBlock:
997       do_the_globalblock(event);
998       goto next_thread;             /* handle next event in event queue  */
999       
1000     case FetchReply:
1001       do_the_fetchreply(event);
1002       goto next_thread;             /* handle next event in event queue  */
1003       
1004     case UnblockThread:   /* Move from the blocked queue to the tail of */
1005       do_the_unblock(event);
1006       goto next_thread;             /* handle next event in event queue  */
1007       
1008     case ResumeThread:  /* Move from the blocked queue to the tail of */
1009       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1010       event->tso->gran.blocktime += 
1011         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1012       do_the_startthread(event);
1013       goto next_thread;             /* handle next event in event queue  */
1014       
1015     case StartThread:
1016       do_the_startthread(event);
1017       goto next_thread;             /* handle next event in event queue  */
1018       
1019     case MoveThread:
1020       do_the_movethread(event);
1021       goto next_thread;             /* handle next event in event queue  */
1022       
1023     case MoveSpark:
1024       do_the_movespark(event);
1025       goto next_thread;             /* handle next event in event queue  */
1026       
1027     case FindWork:
1028       do_the_findwork(event);
1029       goto next_thread;             /* handle next event in event queue  */
1030       
1031     default:
1032       barf("Illegal event type %u\n", event->evttype);
1033     }  /* switch */
1034     
1035     /* This point was scheduler_loop in the old RTS */
1036
1037     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1038
1039     TimeOfLastEvent = CurrentTime[CurrentProc];
1040     TimeOfNextEvent = get_time_of_next_event();
1041     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1042     // CurrentTSO = ThreadQueueHd;
1043
1044     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1045                          TimeOfNextEvent));
1046
1047     if (RtsFlags.GranFlags.Light) 
1048       GranSimLight_leave_system(event, &ActiveTSO); 
1049
1050     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1051
1052     IF_DEBUG(gran, 
1053              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1054
1055     /* in a GranSim setup the TSO stays on the run queue */
1056     t = CurrentTSO;
1057     /* Take a thread from the run queue. */
1058     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1059
1060     IF_DEBUG(gran, 
1061              debugBelch("GRAN: About to run current thread, which is\n");
1062              G_TSO(t,5));
1063
1064     context_switch = 0; // turned on via GranYield, checking events and time slice
1065
1066     IF_DEBUG(gran, 
1067              DumpGranEvent(GR_SCHEDULE, t));
1068
1069     procStatus[CurrentProc] = Busy;
1070 }
1071 #endif // GRAN
1072
1073 /* ----------------------------------------------------------------------------
1074  * Send pending messages (PARALLEL_HASKELL only)
1075  * ------------------------------------------------------------------------- */
1076
1077 #if defined(PARALLEL_HASKELL)
1078 static StgTSO *
1079 scheduleSendPendingMessages(void)
1080 {
1081     StgSparkPool *pool;
1082     rtsSpark spark;
1083     StgTSO *t;
1084
1085 # if defined(PAR) // global Mem.Mgmt., omit for now
1086     if (PendingFetches != END_BF_QUEUE) {
1087         processFetches();
1088     }
1089 # endif
1090     
1091     if (RtsFlags.ParFlags.BufferTime) {
1092         // if we use message buffering, we must send away all message
1093         // packets which have become too old...
1094         sendOldBuffers(); 
1095     }
1096 }
1097 #endif
1098
1099 /* ----------------------------------------------------------------------------
1100  * Activate spark threads (PARALLEL_HASKELL only)
1101  * ------------------------------------------------------------------------- */
1102
1103 #if defined(PARALLEL_HASKELL)
1104 static void
1105 scheduleActivateSpark(void)
1106 {
1107 #if defined(SPARKS)
1108   ASSERT(EMPTY_RUN_QUEUE());
1109 /* We get here if the run queue is empty and want some work.
1110    We try to turn a spark into a thread, and add it to the run queue,
1111    from where it will be picked up in the next iteration of the scheduler
1112    loop.
1113 */
1114
1115       /* :-[  no local threads => look out for local sparks */
1116       /* the spark pool for the current PE */
1117       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1118       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1119           pool->hd < pool->tl) {
1120         /* 
1121          * ToDo: add GC code check that we really have enough heap afterwards!!
1122          * Old comment:
1123          * If we're here (no runnable threads) and we have pending
1124          * sparks, we must have a space problem.  Get enough space
1125          * to turn one of those pending sparks into a
1126          * thread... 
1127          */
1128
1129         spark = findSpark(rtsFalse);            /* get a spark */
1130         if (spark != (rtsSpark) NULL) {
1131           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1132           IF_PAR_DEBUG(fish, // schedule,
1133                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1134                              tso->id, tso, advisory_thread_count));
1135
1136           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1137             IF_PAR_DEBUG(fish, // schedule,
1138                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1139                             spark));
1140             return rtsFalse; /* failed to generate a thread */
1141           }                  /* otherwise fall through & pick-up new tso */
1142         } else {
1143           IF_PAR_DEBUG(fish, // schedule,
1144                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1145                              spark_queue_len(pool)));
1146           return rtsFalse;  /* failed to generate a thread */
1147         }
1148         return rtsTrue;  /* success in generating a thread */
1149   } else { /* no more threads permitted or pool empty */
1150     return rtsFalse;  /* failed to generateThread */
1151   }
1152 #else
1153   tso = NULL; // avoid compiler warning only
1154   return rtsFalse;  /* dummy in non-PAR setup */
1155 #endif // SPARKS
1156 }
1157 #endif // PARALLEL_HASKELL
1158
1159 /* ----------------------------------------------------------------------------
1160  * Get work from a remote node (PARALLEL_HASKELL only)
1161  * ------------------------------------------------------------------------- */
1162     
1163 #if defined(PARALLEL_HASKELL)
1164 static rtsBool
1165 scheduleGetRemoteWork(rtsBool *receivedFinish)
1166 {
1167   ASSERT(EMPTY_RUN_QUEUE());
1168
1169   if (RtsFlags.ParFlags.BufferTime) {
1170         IF_PAR_DEBUG(verbose, 
1171                 debugBelch("...send all pending data,"));
1172         {
1173           nat i;
1174           for (i=1; i<=nPEs; i++)
1175             sendImmediately(i); // send all messages away immediately
1176         }
1177   }
1178 # ifndef SPARKS
1179         //++EDEN++ idle() , i.e. send all buffers, wait for work
1180         // suppress fishing in EDEN... just look for incoming messages
1181         // (blocking receive)
1182   IF_PAR_DEBUG(verbose, 
1183                debugBelch("...wait for incoming messages...\n"));
1184   *receivedFinish = processMessages(); // blocking receive...
1185
1186         // and reenter scheduling loop after having received something
1187         // (return rtsFalse below)
1188
1189 # else /* activate SPARKS machinery */
1190 /* We get here, if we have no work, tried to activate a local spark, but still
1191    have no work. We try to get a remote spark, by sending a FISH message.
1192    Thread migration should be added here, and triggered when a sequence of 
1193    fishes returns without work. */
1194         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1195
1196       /* =8-[  no local sparks => look for work on other PEs */
1197         /*
1198          * We really have absolutely no work.  Send out a fish
1199          * (there may be some out there already), and wait for
1200          * something to arrive.  We clearly can't run any threads
1201          * until a SCHEDULE or RESUME arrives, and so that's what
1202          * we're hoping to see.  (Of course, we still have to
1203          * respond to other types of messages.)
1204          */
1205         rtsTime now = msTime() /*CURRENT_TIME*/;
1206         IF_PAR_DEBUG(verbose, 
1207                      debugBelch("--  now=%ld\n", now));
1208         IF_PAR_DEBUG(fish, // verbose,
1209              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1210                  (last_fish_arrived_at!=0 &&
1211                   last_fish_arrived_at+delay > now)) {
1212                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1213                      now, last_fish_arrived_at+delay, 
1214                      last_fish_arrived_at,
1215                      delay);
1216              });
1217   
1218         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1219             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1220           if (last_fish_arrived_at==0 ||
1221               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1222             /* outstandingFishes is set in sendFish, processFish;
1223                avoid flooding system with fishes via delay */
1224     next_fish_to_send_at = 0;  
1225   } else {
1226     /* ToDo: this should be done in the main scheduling loop to avoid the
1227              busy wait here; not so bad if fish delay is very small  */
1228     int iq = 0; // DEBUGGING -- HWL
1229     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1230     /* send a fish when ready, but process messages that arrive in the meantime */
1231     do {
1232       if (PacketsWaiting()) {
1233         iq++; // DEBUGGING
1234         *receivedFinish = processMessages();
1235       }
1236       now = msTime();
1237     } while (!*receivedFinish || now<next_fish_to_send_at);
1238     // JB: This means the fish could become obsolete, if we receive
1239     // work. Better check for work again? 
1240     // last line: while (!receivedFinish || !haveWork || now<...)
1241     // next line: if (receivedFinish || haveWork )
1242
1243     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1244       return rtsFalse;  // NB: this will leave scheduler loop
1245                         // immediately after return!
1246                           
1247     IF_PAR_DEBUG(fish, // verbose,
1248                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1249
1250   }
1251
1252     // JB: IMHO, this should all be hidden inside sendFish(...)
1253     /* pe = choosePE(); 
1254        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1255                 NEW_FISH_HUNGER);
1256
1257     // Global statistics: count no. of fishes
1258     if (RtsFlags.ParFlags.ParStats.Global &&
1259          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1260            globalParStats.tot_fish_mess++;
1261            }
1262     */ 
1263
1264   /* delayed fishes must have been sent by now! */
1265   next_fish_to_send_at = 0;  
1266   }
1267       
1268   *receivedFinish = processMessages();
1269 # endif /* SPARKS */
1270
1271  return rtsFalse;
1272  /* NB: this function always returns rtsFalse, meaning the scheduler
1273     loop continues with the next iteration; 
1274     rationale: 
1275       return code means success in finding work; we enter this function
1276       if there is no local work, thus have to send a fish which takes
1277       time until it arrives with work; in the meantime we should process
1278       messages in the main loop;
1279  */
1280 }
1281 #endif // PARALLEL_HASKELL
1282
1283 /* ----------------------------------------------------------------------------
1284  * PAR/GRAN: Report stats & debugging info(?)
1285  * ------------------------------------------------------------------------- */
1286
1287 #if defined(PAR) || defined(GRAN)
1288 static void
1289 scheduleGranParReport(void)
1290 {
1291   ASSERT(run_queue_hd != END_TSO_QUEUE);
1292
1293   /* Take a thread from the run queue, if we have work */
1294   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1295
1296     /* If this TSO has got its outport closed in the meantime, 
1297      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1298      * It has to be marked as TH_DEAD for this purpose.
1299      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1300
1301 JB: TODO: investigate wether state change field could be nuked
1302      entirely and replaced by the normal tso state (whatnext
1303      field). All we want to do is to kill tsos from outside.
1304      */
1305
1306     /* ToDo: write something to the log-file
1307     if (RTSflags.ParFlags.granSimStats && !sameThread)
1308         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1309
1310     CurrentTSO = t;
1311     */
1312     /* the spark pool for the current PE */
1313     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1314
1315     IF_DEBUG(scheduler, 
1316              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1317                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1318
1319     IF_PAR_DEBUG(fish,
1320              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1321                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1322
1323     if (RtsFlags.ParFlags.ParStats.Full && 
1324         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1325         (emitSchedule || // forced emit
1326          (t && LastTSO && t->id != LastTSO->id))) {
1327       /* 
1328          we are running a different TSO, so write a schedule event to log file
1329          NB: If we use fair scheduling we also have to write  a deschedule 
1330              event for LastTSO; with unfair scheduling we know that the
1331              previous tso has blocked whenever we switch to another tso, so
1332              we don't need it in GUM for now
1333       */
1334       IF_PAR_DEBUG(fish, // schedule,
1335                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1336
1337       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1338                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1339       emitSchedule = rtsFalse;
1340     }
1341 }     
1342 #endif
1343
1344 /* ----------------------------------------------------------------------------
1345  * After running a thread...
1346  * ASSUMES: sched_mutex
1347  * ------------------------------------------------------------------------- */
1348
1349 static void
1350 schedulePostRunThread(void)
1351 {
1352 #if defined(PAR)
1353     /* HACK 675: if the last thread didn't yield, make sure to print a 
1354        SCHEDULE event to the log file when StgRunning the next thread, even
1355        if it is the same one as before */
1356     LastTSO = t; 
1357     TimeOfLastYield = CURRENT_TIME;
1358 #endif
1359
1360   /* some statistics gathering in the parallel case */
1361
1362 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1363   switch (ret) {
1364     case HeapOverflow:
1365 # if defined(GRAN)
1366       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1367       globalGranStats.tot_heapover++;
1368 # elif defined(PAR)
1369       globalParStats.tot_heapover++;
1370 # endif
1371       break;
1372
1373      case StackOverflow:
1374 # if defined(GRAN)
1375       IF_DEBUG(gran, 
1376                DumpGranEvent(GR_DESCHEDULE, t));
1377       globalGranStats.tot_stackover++;
1378 # elif defined(PAR)
1379       // IF_DEBUG(par, 
1380       // DumpGranEvent(GR_DESCHEDULE, t);
1381       globalParStats.tot_stackover++;
1382 # endif
1383       break;
1384
1385     case ThreadYielding:
1386 # if defined(GRAN)
1387       IF_DEBUG(gran, 
1388                DumpGranEvent(GR_DESCHEDULE, t));
1389       globalGranStats.tot_yields++;
1390 # elif defined(PAR)
1391       // IF_DEBUG(par, 
1392       // DumpGranEvent(GR_DESCHEDULE, t);
1393       globalParStats.tot_yields++;
1394 # endif
1395       break; 
1396
1397     case ThreadBlocked:
1398 # if defined(GRAN)
1399       IF_DEBUG(scheduler,
1400                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1401                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1402                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1403                if (t->block_info.closure!=(StgClosure*)NULL)
1404                  print_bq(t->block_info.closure);
1405                debugBelch("\n"));
1406
1407       // ??? needed; should emit block before
1408       IF_DEBUG(gran, 
1409                DumpGranEvent(GR_DESCHEDULE, t)); 
1410       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1411       /*
1412         ngoq Dogh!
1413       ASSERT(procStatus[CurrentProc]==Busy || 
1414               ((procStatus[CurrentProc]==Fetching) && 
1415               (t->block_info.closure!=(StgClosure*)NULL)));
1416       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1417           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1418             procStatus[CurrentProc]==Fetching)) 
1419         procStatus[CurrentProc] = Idle;
1420       */
1421 # elif defined(PAR)
1422 //++PAR++  blockThread() writes the event (change?)
1423 # endif
1424     break;
1425
1426   case ThreadFinished:
1427     break;
1428
1429   default:
1430     barf("parGlobalStats: unknown return code");
1431     break;
1432     }
1433 #endif
1434 }
1435
1436 /* -----------------------------------------------------------------------------
1437  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1438  * ASSUMES: sched_mutex
1439  * -------------------------------------------------------------------------- */
1440
1441 static rtsBool
1442 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1443 {
1444     // did the task ask for a large block?
1445     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1446         // if so, get one and push it on the front of the nursery.
1447         bdescr *bd;
1448         lnat blocks;
1449         
1450         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1451         
1452         IF_DEBUG(scheduler,
1453                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1454                             (long)t->id, whatNext_strs[t->what_next], blocks));
1455         
1456         // don't do this if it would push us over the
1457         // alloc_blocks_lim limit; we'll GC first.
1458         if (alloc_blocks + blocks < alloc_blocks_lim) {
1459             
1460             alloc_blocks += blocks;
1461             bd = allocGroup( blocks );
1462             
1463             // link the new group into the list
1464             bd->link = cap->r.rCurrentNursery;
1465             bd->u.back = cap->r.rCurrentNursery->u.back;
1466             if (cap->r.rCurrentNursery->u.back != NULL) {
1467                 cap->r.rCurrentNursery->u.back->link = bd;
1468             } else {
1469 #if !defined(SMP)
1470                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1471                        g0s0->blocks == cap->r.rNursery);
1472                 g0s0->blocks = bd;
1473 #endif
1474                 cap->r.rNursery = bd;
1475             }             
1476             cap->r.rCurrentNursery->u.back = bd;
1477             
1478             // initialise it as a nursery block.  We initialise the
1479             // step, gen_no, and flags field of *every* sub-block in
1480             // this large block, because this is easier than making
1481             // sure that we always find the block head of a large
1482             // block whenever we call Bdescr() (eg. evacuate() and
1483             // isAlive() in the GC would both have to do this, at
1484             // least).
1485             { 
1486                 bdescr *x;
1487                 for (x = bd; x < bd + blocks; x++) {
1488                     x->step = g0s0;
1489                     x->gen_no = 0;
1490                     x->flags = 0;
1491                 }
1492             }
1493             
1494 #if !defined(SMP)
1495             // don't forget to update the block count in g0s0.
1496             g0s0->n_blocks += blocks;
1497
1498             // This assert can be a killer if the app is doing lots
1499             // of large block allocations.
1500             ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1501 #endif
1502             
1503             // now update the nursery to point to the new block
1504             cap->r.rCurrentNursery = bd;
1505             
1506             // we might be unlucky and have another thread get on the
1507             // run queue before us and steal the large block, but in that
1508             // case the thread will just end up requesting another large
1509             // block.
1510             PUSH_ON_RUN_QUEUE(t);
1511             return rtsFalse;  /* not actually GC'ing */
1512         }
1513     }
1514     
1515     /* make all the running tasks block on a condition variable,
1516      * maybe set context_switch and wait till they all pile in,
1517      * then have them wait on a GC condition variable.
1518      */
1519     IF_DEBUG(scheduler,
1520              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1521                         (long)t->id, whatNext_strs[t->what_next]));
1522     threadPaused(t);
1523 #if defined(GRAN)
1524     ASSERT(!is_on_queue(t,CurrentProc));
1525 #elif defined(PARALLEL_HASKELL)
1526     /* Currently we emit a DESCHEDULE event before GC in GUM.
1527        ToDo: either add separate event to distinguish SYSTEM time from rest
1528        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1529     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1530         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1531                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1532         emitSchedule = rtsTrue;
1533     }
1534 #endif
1535       
1536     PUSH_ON_RUN_QUEUE(t);
1537     return rtsTrue;
1538     /* actual GC is done at the end of the while loop in schedule() */
1539 }
1540
1541 /* -----------------------------------------------------------------------------
1542  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1543  * ASSUMES: sched_mutex
1544  * -------------------------------------------------------------------------- */
1545
1546 static void
1547 scheduleHandleStackOverflow( StgTSO *t)
1548 {
1549     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1550                                   (long)t->id, whatNext_strs[t->what_next]));
1551     /* just adjust the stack for this thread, then pop it back
1552      * on the run queue.
1553      */
1554     threadPaused(t);
1555     { 
1556         /* enlarge the stack */
1557         StgTSO *new_t = threadStackOverflow(t);
1558         
1559         /* This TSO has moved, so update any pointers to it from the
1560          * main thread stack.  It better not be on any other queues...
1561          * (it shouldn't be).
1562          */
1563         if (t->main != NULL) {
1564             t->main->tso = new_t;
1565         }
1566         PUSH_ON_RUN_QUEUE(new_t);
1567     }
1568 }
1569
1570 /* -----------------------------------------------------------------------------
1571  * Handle a thread that returned to the scheduler with ThreadYielding
1572  * ASSUMES: sched_mutex
1573  * -------------------------------------------------------------------------- */
1574
1575 static rtsBool
1576 scheduleHandleYield( StgTSO *t, nat prev_what_next )
1577 {
1578     // Reset the context switch flag.  We don't do this just before
1579     // running the thread, because that would mean we would lose ticks
1580     // during GC, which can lead to unfair scheduling (a thread hogs
1581     // the CPU because the tick always arrives during GC).  This way
1582     // penalises threads that do a lot of allocation, but that seems
1583     // better than the alternative.
1584     context_switch = 0;
1585     
1586     /* put the thread back on the run queue.  Then, if we're ready to
1587      * GC, check whether this is the last task to stop.  If so, wake
1588      * up the GC thread.  getThread will block during a GC until the
1589      * GC is finished.
1590      */
1591     IF_DEBUG(scheduler,
1592              if (t->what_next != prev_what_next) {
1593                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1594                             (long)t->id, whatNext_strs[t->what_next]);
1595              } else {
1596                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1597                             (long)t->id, whatNext_strs[t->what_next]);
1598              }
1599         );
1600     
1601     IF_DEBUG(sanity,
1602              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1603              checkTSO(t));
1604     ASSERT(t->link == END_TSO_QUEUE);
1605     
1606     // Shortcut if we're just switching evaluators: don't bother
1607     // doing stack squeezing (which can be expensive), just run the
1608     // thread.
1609     if (t->what_next != prev_what_next) {
1610         return rtsTrue;
1611     }
1612     
1613     threadPaused(t);
1614     
1615 #if defined(GRAN)
1616     ASSERT(!is_on_queue(t,CurrentProc));
1617       
1618     IF_DEBUG(sanity,
1619              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1620              checkThreadQsSanity(rtsTrue));
1621
1622 #endif
1623
1624     addToRunQueue(t);
1625
1626 #if defined(GRAN)
1627     /* add a ContinueThread event to actually process the thread */
1628     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1629               ContinueThread,
1630               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1631     IF_GRAN_DEBUG(bq, 
1632                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1633                   G_EVENTQ(0);
1634                   G_CURR_THREADQ(0));
1635 #endif
1636     return rtsFalse;
1637 }
1638
1639 /* -----------------------------------------------------------------------------
1640  * Handle a thread that returned to the scheduler with ThreadBlocked
1641  * ASSUMES: sched_mutex
1642  * -------------------------------------------------------------------------- */
1643
1644 static void
1645 scheduleHandleThreadBlocked( StgTSO *t
1646 #if !defined(GRAN) && !defined(DEBUG)
1647     STG_UNUSED
1648 #endif
1649     )
1650 {
1651 #if defined(GRAN)
1652     IF_DEBUG(scheduler,
1653              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1654                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1655              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1656     
1657     // ??? needed; should emit block before
1658     IF_DEBUG(gran, 
1659              DumpGranEvent(GR_DESCHEDULE, t)); 
1660     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1661     /*
1662       ngoq Dogh!
1663       ASSERT(procStatus[CurrentProc]==Busy || 
1664       ((procStatus[CurrentProc]==Fetching) && 
1665       (t->block_info.closure!=(StgClosure*)NULL)));
1666       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1667       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1668       procStatus[CurrentProc]==Fetching)) 
1669       procStatus[CurrentProc] = Idle;
1670     */
1671 #elif defined(PAR)
1672     IF_DEBUG(scheduler,
1673              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1674                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1675     IF_PAR_DEBUG(bq,
1676                  
1677                  if (t->block_info.closure!=(StgClosure*)NULL) 
1678                  print_bq(t->block_info.closure));
1679     
1680     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1681     blockThread(t);
1682     
1683     /* whatever we schedule next, we must log that schedule */
1684     emitSchedule = rtsTrue;
1685     
1686 #else /* !GRAN */
1687       /* don't need to do anything.  Either the thread is blocked on
1688        * I/O, in which case we'll have called addToBlockedQueue
1689        * previously, or it's blocked on an MVar or Blackhole, in which
1690        * case it'll be on the relevant queue already.
1691        */
1692     ASSERT(t->why_blocked != NotBlocked);
1693     IF_DEBUG(scheduler,
1694              debugBelch("--<< thread %d (%s) stopped: ", 
1695                         t->id, whatNext_strs[t->what_next]);
1696              printThreadBlockage(t);
1697              debugBelch("\n"));
1698     
1699     /* Only for dumping event to log file 
1700        ToDo: do I need this in GranSim, too?
1701        blockThread(t);
1702     */
1703 #endif
1704 }
1705
1706 /* -----------------------------------------------------------------------------
1707  * Handle a thread that returned to the scheduler with ThreadFinished
1708  * ASSUMES: sched_mutex
1709  * -------------------------------------------------------------------------- */
1710
1711 static rtsBool
1712 scheduleHandleThreadFinished( StgMainThread *mainThread
1713                               USED_WHEN_RTS_SUPPORTS_THREADS,
1714                               Capability *cap,
1715                               StgTSO *t )
1716 {
1717     /* Need to check whether this was a main thread, and if so,
1718      * return with the return value.
1719      *
1720      * We also end up here if the thread kills itself with an
1721      * uncaught exception, see Exception.cmm.
1722      */
1723     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1724                                   t->id, whatNext_strs[t->what_next]));
1725
1726 #if defined(GRAN)
1727       endThread(t, CurrentProc); // clean-up the thread
1728 #elif defined(PARALLEL_HASKELL)
1729       /* For now all are advisory -- HWL */
1730       //if(t->priority==AdvisoryPriority) ??
1731       advisory_thread_count--; // JB: Caution with this counter, buggy!
1732       
1733 # if defined(DIST)
1734       if(t->dist.priority==RevalPriority)
1735         FinishReval(t);
1736 # endif
1737     
1738 # if defined(EDENOLD)
1739       // the thread could still have an outport... (BUG)
1740       if (t->eden.outport != -1) {
1741       // delete the outport for the tso which has finished...
1742         IF_PAR_DEBUG(eden_ports,
1743                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1744                               t->eden.outport, t->id));
1745         deleteOPT(t);
1746       }
1747       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1748       if (t->eden.epid != -1) {
1749         IF_PAR_DEBUG(eden_ports,
1750                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1751                            t->id, t->eden.epid));
1752         removeTSOfromProcess(t);
1753       }
1754 # endif 
1755
1756 # if defined(PAR)
1757       if (RtsFlags.ParFlags.ParStats.Full &&
1758           !RtsFlags.ParFlags.ParStats.Suppressed) 
1759         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1760
1761       //  t->par only contains statistics: left out for now...
1762       IF_PAR_DEBUG(fish,
1763                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1764                               t->id,t,t->par.sparkname));
1765 # endif
1766 #endif // PARALLEL_HASKELL
1767
1768       //
1769       // Check whether the thread that just completed was a main
1770       // thread, and if so return with the result.  
1771       //
1772       // There is an assumption here that all thread completion goes
1773       // through this point; we need to make sure that if a thread
1774       // ends up in the ThreadKilled state, that it stays on the run
1775       // queue so it can be dealt with here.
1776       //
1777       if (
1778 #if defined(RTS_SUPPORTS_THREADS)
1779           mainThread != NULL
1780 #else
1781           mainThread->tso == t
1782 #endif
1783           )
1784       {
1785           // We are a bound thread: this must be our thread that just
1786           // completed.
1787           ASSERT(mainThread->tso == t);
1788
1789           if (t->what_next == ThreadComplete) {
1790               if (mainThread->ret) {
1791                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1792                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1793               }
1794               mainThread->stat = Success;
1795           } else {
1796               if (mainThread->ret) {
1797                   *(mainThread->ret) = NULL;
1798               }
1799               if (interrupted) {
1800                   mainThread->stat = Interrupted;
1801               } else {
1802                   mainThread->stat = Killed;
1803               }
1804           }
1805 #ifdef DEBUG
1806           removeThreadLabel((StgWord)mainThread->tso->id);
1807 #endif
1808           if (mainThread->prev == NULL) {
1809               main_threads = mainThread->link;
1810           } else {
1811               mainThread->prev->link = mainThread->link;
1812           }
1813           if (mainThread->link != NULL) {
1814               mainThread->link->prev = NULL;
1815           }
1816           releaseCapability(cap);
1817           return rtsTrue; // tells schedule() to return
1818       }
1819
1820 #ifdef RTS_SUPPORTS_THREADS
1821       ASSERT(t->main == NULL);
1822 #else
1823       if (t->main != NULL) {
1824           // Must be a main thread that is not the topmost one.  Leave
1825           // it on the run queue until the stack has unwound to the
1826           // point where we can deal with this.  Leaving it on the run
1827           // queue also ensures that the garbage collector knows about
1828           // this thread and its return value (it gets dropped from the
1829           // all_threads list so there's no other way to find it).
1830           APPEND_TO_RUN_QUEUE(t);
1831       }
1832 #endif
1833       return rtsFalse;
1834 }
1835
1836 /* -----------------------------------------------------------------------------
1837  * Perform a heap census, if PROFILING
1838  * -------------------------------------------------------------------------- */
1839
1840 static rtsBool
1841 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1842 {
1843 #if defined(PROFILING)
1844     // When we have +RTS -i0 and we're heap profiling, do a census at
1845     // every GC.  This lets us get repeatable runs for debugging.
1846     if (performHeapProfile ||
1847         (RtsFlags.ProfFlags.profileInterval==0 &&
1848          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1849         GarbageCollect(GetRoots, rtsTrue);
1850         heapCensus();
1851         performHeapProfile = rtsFalse;
1852         return rtsTrue;  // true <=> we already GC'd
1853     }
1854 #endif
1855     return rtsFalse;
1856 }
1857
1858 /* -----------------------------------------------------------------------------
1859  * Perform a garbage collection if necessary
1860  * ASSUMES: sched_mutex
1861  * -------------------------------------------------------------------------- */
1862
1863 static void
1864 scheduleDoGC( Capability *cap STG_UNUSED )
1865 {
1866     StgTSO *t;
1867 #ifdef SMP
1868     int n_capabilities = RtsFlags.ParFlags.nNodes - 1; 
1869            // subtract one because we're already holding one.
1870     Capability *caps[n_capabilities];
1871 #endif
1872
1873 #ifdef SMP
1874     // In order to GC, there must be no threads running Haskell code.
1875     // Therefore, the GC thread needs to hold *all* the capabilities,
1876     // and release them after the GC has completed.  
1877     //
1878     // This seems to be the simplest way: previous attempts involved
1879     // making all the threads with capabilities give up their
1880     // capabilities and sleep except for the *last* one, which
1881     // actually did the GC.  But it's quite hard to arrange for all
1882     // the other tasks to sleep and stay asleep.
1883     //
1884         
1885     caps[n_capabilities] = cap;
1886     while (n_capabilities > 0) {
1887         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
1888         waitForReturnCapability(&sched_mutex, &cap);
1889         n_capabilities--;
1890         caps[n_capabilities] = cap;
1891     }
1892 #endif
1893
1894     /* Kick any transactions which are invalid back to their
1895      * atomically frames.  When next scheduled they will try to
1896      * commit, this commit will fail and they will retry.
1897      */
1898     for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1899         if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1900             if (!stmValidateTransaction (t -> trec)) {
1901                 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1902                 
1903                 // strip the stack back to the ATOMICALLY_FRAME, aborting
1904                 // the (nested) transaction, and saving the stack of any
1905                 // partially-evaluated thunks on the heap.
1906                 raiseAsync_(t, NULL, rtsTrue);
1907                 
1908 #ifdef REG_R1
1909                 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1910 #endif
1911             }
1912         }
1913     }
1914     
1915     // so this happens periodically:
1916     scheduleCheckBlackHoles();
1917     
1918     /* everybody back, start the GC.
1919      * Could do it in this thread, or signal a condition var
1920      * to do it in another thread.  Either way, we need to
1921      * broadcast on gc_pending_cond afterward.
1922      */
1923 #if defined(RTS_SUPPORTS_THREADS)
1924     IF_DEBUG(scheduler,sched_belch("doing GC"));
1925 #endif
1926     GarbageCollect(GetRoots,rtsFalse);
1927     
1928 #if defined(SMP)
1929     {
1930         // release our stash of capabilities.
1931         nat i;
1932         for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
1933             releaseCapability(caps[i]);
1934         }
1935     }
1936 #endif
1937
1938 #if defined(GRAN)
1939     /* add a ContinueThread event to continue execution of current thread */
1940     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1941               ContinueThread,
1942               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1943     IF_GRAN_DEBUG(bq, 
1944                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1945                   G_EVENTQ(0);
1946                   G_CURR_THREADQ(0));
1947 #endif /* GRAN */
1948 }
1949
1950 /* ---------------------------------------------------------------------------
1951  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1952  * used by Control.Concurrent for error checking.
1953  * ------------------------------------------------------------------------- */
1954  
1955 StgBool
1956 rtsSupportsBoundThreads(void)
1957 {
1958 #ifdef THREADED_RTS
1959   return rtsTrue;
1960 #else
1961   return rtsFalse;
1962 #endif
1963 }
1964
1965 /* ---------------------------------------------------------------------------
1966  * isThreadBound(tso): check whether tso is bound to an OS thread.
1967  * ------------------------------------------------------------------------- */
1968  
1969 StgBool
1970 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1971 {
1972 #ifdef THREADED_RTS
1973   return (tso->main != NULL);
1974 #endif
1975   return rtsFalse;
1976 }
1977
1978 /* ---------------------------------------------------------------------------
1979  * Singleton fork(). Do not copy any running threads.
1980  * ------------------------------------------------------------------------- */
1981
1982 #ifndef mingw32_HOST_OS
1983 #define FORKPROCESS_PRIMOP_SUPPORTED
1984 #endif
1985
1986 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1987 static void 
1988 deleteThreadImmediately(StgTSO *tso);
1989 #endif
1990 StgInt
1991 forkProcess(HsStablePtr *entry
1992 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1993             STG_UNUSED
1994 #endif
1995            )
1996 {
1997 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1998   pid_t pid;
1999   StgTSO* t,*next;
2000   StgMainThread *m;
2001   SchedulerStatus rc;
2002
2003   IF_DEBUG(scheduler,sched_belch("forking!"));
2004   rts_lock(); // This not only acquires sched_mutex, it also
2005               // makes sure that no other threads are running
2006
2007   pid = fork();
2008
2009   if (pid) { /* parent */
2010
2011   /* just return the pid */
2012     rts_unlock();
2013     return pid;
2014     
2015   } else { /* child */
2016     
2017     
2018       // delete all threads
2019     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
2020     
2021     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2022       next = t->link;
2023
2024         // don't allow threads to catch the ThreadKilled exception
2025       deleteThreadImmediately(t);
2026     }
2027     
2028       // wipe the main thread list
2029     while((m = main_threads) != NULL) {
2030       main_threads = m->link;
2031 # ifdef THREADED_RTS
2032       closeCondition(&m->bound_thread_cond);
2033 # endif
2034       stgFree(m);
2035     }
2036     
2037     rc = rts_evalStableIO(entry, NULL);  // run the action
2038     rts_checkSchedStatus("forkProcess",rc);
2039     
2040     rts_unlock();
2041     
2042     hs_exit();                      // clean up and exit
2043     stg_exit(0);
2044   }
2045 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2046   barf("forkProcess#: primop not supported, sorry!\n");
2047   return -1;
2048 #endif
2049 }
2050
2051 /* ---------------------------------------------------------------------------
2052  * deleteAllThreads():  kill all the live threads.
2053  *
2054  * This is used when we catch a user interrupt (^C), before performing
2055  * any necessary cleanups and running finalizers.
2056  *
2057  * Locks: sched_mutex held.
2058  * ------------------------------------------------------------------------- */
2059    
2060 void
2061 deleteAllThreads ( void )
2062 {
2063   StgTSO* t, *next;
2064   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2065   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2066       next = t->global_link;
2067       deleteThread(t);
2068   }      
2069
2070   // The run queue now contains a bunch of ThreadKilled threads.  We
2071   // must not throw these away: the main thread(s) will be in there
2072   // somewhere, and the main scheduler loop has to deal with it.
2073   // Also, the run queue is the only thing keeping these threads from
2074   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2075
2076   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2077   ASSERT(blackhole_queue == END_TSO_QUEUE);
2078   ASSERT(sleeping_queue == END_TSO_QUEUE);
2079 }
2080
2081 /* startThread and  insertThread are now in GranSim.c -- HWL */
2082
2083
2084 /* ---------------------------------------------------------------------------
2085  * Suspending & resuming Haskell threads.
2086  * 
2087  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2088  * its capability before calling the C function.  This allows another
2089  * task to pick up the capability and carry on running Haskell
2090  * threads.  It also means that if the C call blocks, it won't lock
2091  * the whole system.
2092  *
2093  * The Haskell thread making the C call is put to sleep for the
2094  * duration of the call, on the susepended_ccalling_threads queue.  We
2095  * give out a token to the task, which it can use to resume the thread
2096  * on return from the C function.
2097  * ------------------------------------------------------------------------- */
2098    
2099 StgInt
2100 suspendThread( StgRegTable *reg )
2101 {
2102   nat tok;
2103   Capability *cap;
2104   int saved_errno = errno;
2105
2106   /* assume that *reg is a pointer to the StgRegTable part
2107    * of a Capability.
2108    */
2109   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
2110
2111   ACQUIRE_LOCK(&sched_mutex);
2112
2113   IF_DEBUG(scheduler,
2114            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
2115
2116   // XXX this might not be necessary --SDM
2117   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
2118
2119   threadPaused(cap->r.rCurrentTSO);
2120   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
2121   suspended_ccalling_threads = cap->r.rCurrentTSO;
2122
2123   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
2124       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
2125       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
2126   } else {
2127       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
2128   }
2129
2130   /* Use the thread ID as the token; it should be unique */
2131   tok = cap->r.rCurrentTSO->id;
2132
2133   /* Hand back capability */
2134   releaseCapability(cap);
2135   
2136 #if defined(RTS_SUPPORTS_THREADS)
2137   /* Preparing to leave the RTS, so ensure there's a native thread/task
2138      waiting to take over.
2139   */
2140   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
2141 #endif
2142
2143   in_haskell = rtsFalse;
2144   RELEASE_LOCK(&sched_mutex);
2145   
2146   errno = saved_errno;
2147   return tok; 
2148 }
2149
2150 StgRegTable *
2151 resumeThread( StgInt tok )
2152 {
2153   StgTSO *tso, **prev;
2154   Capability *cap;
2155   int saved_errno = errno;
2156
2157 #if defined(RTS_SUPPORTS_THREADS)
2158   /* Wait for permission to re-enter the RTS with the result. */
2159   ACQUIRE_LOCK(&sched_mutex);
2160   waitForReturnCapability(&sched_mutex, &cap);
2161
2162   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
2163 #else
2164   grabCapability(&cap);
2165 #endif
2166
2167   /* Remove the thread off of the suspended list */
2168   prev = &suspended_ccalling_threads;
2169   for (tso = suspended_ccalling_threads; 
2170        tso != END_TSO_QUEUE; 
2171        prev = &tso->link, tso = tso->link) {
2172     if (tso->id == (StgThreadID)tok) {
2173       *prev = tso->link;
2174       break;
2175     }
2176   }
2177   if (tso == END_TSO_QUEUE) {
2178     barf("resumeThread: thread not found");
2179   }
2180   tso->link = END_TSO_QUEUE;
2181   
2182   if(tso->why_blocked == BlockedOnCCall) {
2183       awakenBlockedQueueNoLock(tso->blocked_exceptions);
2184       tso->blocked_exceptions = NULL;
2185   }
2186   
2187   /* Reset blocking status */
2188   tso->why_blocked  = NotBlocked;
2189
2190   cap->r.rCurrentTSO = tso;
2191   in_haskell = rtsTrue;
2192   RELEASE_LOCK(&sched_mutex);
2193   errno = saved_errno;
2194   return &cap->r;
2195 }
2196
2197 /* ---------------------------------------------------------------------------
2198  * Comparing Thread ids.
2199  *
2200  * This is used from STG land in the implementation of the
2201  * instances of Eq/Ord for ThreadIds.
2202  * ------------------------------------------------------------------------ */
2203
2204 int
2205 cmp_thread(StgPtr tso1, StgPtr tso2) 
2206
2207   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2208   StgThreadID id2 = ((StgTSO *)tso2)->id;
2209  
2210   if (id1 < id2) return (-1);
2211   if (id1 > id2) return 1;
2212   return 0;
2213 }
2214
2215 /* ---------------------------------------------------------------------------
2216  * Fetching the ThreadID from an StgTSO.
2217  *
2218  * This is used in the implementation of Show for ThreadIds.
2219  * ------------------------------------------------------------------------ */
2220 int
2221 rts_getThreadId(StgPtr tso) 
2222 {
2223   return ((StgTSO *)tso)->id;
2224 }
2225
2226 #ifdef DEBUG
2227 void
2228 labelThread(StgPtr tso, char *label)
2229 {
2230   int len;
2231   void *buf;
2232
2233   /* Caveat: Once set, you can only set the thread name to "" */
2234   len = strlen(label)+1;
2235   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2236   strncpy(buf,label,len);
2237   /* Update will free the old memory for us */
2238   updateThreadLabel(((StgTSO *)tso)->id,buf);
2239 }
2240 #endif /* DEBUG */
2241
2242 /* ---------------------------------------------------------------------------
2243    Create a new thread.
2244
2245    The new thread starts with the given stack size.  Before the
2246    scheduler can run, however, this thread needs to have a closure
2247    (and possibly some arguments) pushed on its stack.  See
2248    pushClosure() in Schedule.h.
2249
2250    createGenThread() and createIOThread() (in SchedAPI.h) are
2251    convenient packaged versions of this function.
2252
2253    currently pri (priority) is only used in a GRAN setup -- HWL
2254    ------------------------------------------------------------------------ */
2255 #if defined(GRAN)
2256 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2257 StgTSO *
2258 createThread(nat size, StgInt pri)
2259 #else
2260 StgTSO *
2261 createThread(nat size)
2262 #endif
2263 {
2264
2265     StgTSO *tso;
2266     nat stack_size;
2267
2268     /* First check whether we should create a thread at all */
2269 #if defined(PARALLEL_HASKELL)
2270   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2271   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2272     threadsIgnored++;
2273     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2274           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2275     return END_TSO_QUEUE;
2276   }
2277   threadsCreated++;
2278 #endif
2279
2280 #if defined(GRAN)
2281   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2282 #endif
2283
2284   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2285
2286   /* catch ridiculously small stack sizes */
2287   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2288     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2289   }
2290
2291   stack_size = size - TSO_STRUCT_SIZEW;
2292
2293   tso = (StgTSO *)allocate(size);
2294   TICK_ALLOC_TSO(stack_size, 0);
2295
2296   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2297 #if defined(GRAN)
2298   SET_GRAN_HDR(tso, ThisPE);
2299 #endif
2300
2301   // Always start with the compiled code evaluator
2302   tso->what_next = ThreadRunGHC;
2303
2304   tso->id = next_thread_id++; 
2305   tso->why_blocked  = NotBlocked;
2306   tso->blocked_exceptions = NULL;
2307
2308   tso->saved_errno = 0;
2309   tso->main = NULL;
2310   
2311   tso->stack_size   = stack_size;
2312   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2313                               - TSO_STRUCT_SIZEW;
2314   tso->sp           = (P_)&(tso->stack) + stack_size;
2315
2316   tso->trec = NO_TREC;
2317
2318 #ifdef PROFILING
2319   tso->prof.CCCS = CCS_MAIN;
2320 #endif
2321
2322   /* put a stop frame on the stack */
2323   tso->sp -= sizeofW(StgStopFrame);
2324   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2325   tso->link = END_TSO_QUEUE;
2326
2327   // ToDo: check this
2328 #if defined(GRAN)
2329   /* uses more flexible routine in GranSim */
2330   insertThread(tso, CurrentProc);
2331 #else
2332   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2333    * from its creation
2334    */
2335 #endif
2336
2337 #if defined(GRAN) 
2338   if (RtsFlags.GranFlags.GranSimStats.Full) 
2339     DumpGranEvent(GR_START,tso);
2340 #elif defined(PARALLEL_HASKELL)
2341   if (RtsFlags.ParFlags.ParStats.Full) 
2342     DumpGranEvent(GR_STARTQ,tso);
2343   /* HACk to avoid SCHEDULE 
2344      LastTSO = tso; */
2345 #endif
2346
2347   /* Link the new thread on the global thread list.
2348    */
2349   tso->global_link = all_threads;
2350   all_threads = tso;
2351
2352 #if defined(DIST)
2353   tso->dist.priority = MandatoryPriority; //by default that is...
2354 #endif
2355
2356 #if defined(GRAN)
2357   tso->gran.pri = pri;
2358 # if defined(DEBUG)
2359   tso->gran.magic = TSO_MAGIC; // debugging only
2360 # endif
2361   tso->gran.sparkname   = 0;
2362   tso->gran.startedat   = CURRENT_TIME; 
2363   tso->gran.exported    = 0;
2364   tso->gran.basicblocks = 0;
2365   tso->gran.allocs      = 0;
2366   tso->gran.exectime    = 0;
2367   tso->gran.fetchtime   = 0;
2368   tso->gran.fetchcount  = 0;
2369   tso->gran.blocktime   = 0;
2370   tso->gran.blockcount  = 0;
2371   tso->gran.blockedat   = 0;
2372   tso->gran.globalsparks = 0;
2373   tso->gran.localsparks  = 0;
2374   if (RtsFlags.GranFlags.Light)
2375     tso->gran.clock  = Now; /* local clock */
2376   else
2377     tso->gran.clock  = 0;
2378
2379   IF_DEBUG(gran,printTSO(tso));
2380 #elif defined(PARALLEL_HASKELL)
2381 # if defined(DEBUG)
2382   tso->par.magic = TSO_MAGIC; // debugging only
2383 # endif
2384   tso->par.sparkname   = 0;
2385   tso->par.startedat   = CURRENT_TIME; 
2386   tso->par.exported    = 0;
2387   tso->par.basicblocks = 0;
2388   tso->par.allocs      = 0;
2389   tso->par.exectime    = 0;
2390   tso->par.fetchtime   = 0;
2391   tso->par.fetchcount  = 0;
2392   tso->par.blocktime   = 0;
2393   tso->par.blockcount  = 0;
2394   tso->par.blockedat   = 0;
2395   tso->par.globalsparks = 0;
2396   tso->par.localsparks  = 0;
2397 #endif
2398
2399 #if defined(GRAN)
2400   globalGranStats.tot_threads_created++;
2401   globalGranStats.threads_created_on_PE[CurrentProc]++;
2402   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2403   globalGranStats.tot_sq_probes++;
2404 #elif defined(PARALLEL_HASKELL)
2405   // collect parallel global statistics (currently done together with GC stats)
2406   if (RtsFlags.ParFlags.ParStats.Global &&
2407       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2408     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2409     globalParStats.tot_threads_created++;
2410   }
2411 #endif 
2412
2413 #if defined(GRAN)
2414   IF_GRAN_DEBUG(pri,
2415                 sched_belch("==__ schedule: Created TSO %d (%p);",
2416                       CurrentProc, tso, tso->id));
2417 #elif defined(PARALLEL_HASKELL)
2418   IF_PAR_DEBUG(verbose,
2419                sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2420                            (long)tso->id, tso, advisory_thread_count));
2421 #else
2422   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2423                                  (long)tso->id, (long)tso->stack_size));
2424 #endif    
2425   return tso;
2426 }
2427
2428 #if defined(PAR)
2429 /* RFP:
2430    all parallel thread creation calls should fall through the following routine.
2431 */
2432 StgTSO *
2433 createThreadFromSpark(rtsSpark spark) 
2434 { StgTSO *tso;
2435   ASSERT(spark != (rtsSpark)NULL);
2436 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2437   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2438   { threadsIgnored++;
2439     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2440           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2441     return END_TSO_QUEUE;
2442   }
2443   else
2444   { threadsCreated++;
2445     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2446     if (tso==END_TSO_QUEUE)     
2447       barf("createSparkThread: Cannot create TSO");
2448 #if defined(DIST)
2449     tso->priority = AdvisoryPriority;
2450 #endif
2451     pushClosure(tso,spark);
2452     addToRunQueue(tso);
2453     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2454   }
2455   return tso;
2456 }
2457 #endif
2458
2459 /*
2460   Turn a spark into a thread.
2461   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2462 */
2463 #if 0
2464 StgTSO *
2465 activateSpark (rtsSpark spark) 
2466 {
2467   StgTSO *tso;
2468
2469   tso = createSparkThread(spark);
2470   if (RtsFlags.ParFlags.ParStats.Full) {   
2471     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2472       IF_PAR_DEBUG(verbose,
2473                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2474                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2475   }
2476   // ToDo: fwd info on local/global spark to thread -- HWL
2477   // tso->gran.exported =  spark->exported;
2478   // tso->gran.locked =   !spark->global;
2479   // tso->gran.sparkname = spark->name;
2480
2481   return tso;
2482 }
2483 #endif
2484
2485 /* ---------------------------------------------------------------------------
2486  * scheduleThread()
2487  *
2488  * scheduleThread puts a thread on the head of the runnable queue.
2489  * This will usually be done immediately after a thread is created.
2490  * The caller of scheduleThread must create the thread using e.g.
2491  * createThread and push an appropriate closure
2492  * on this thread's stack before the scheduler is invoked.
2493  * ------------------------------------------------------------------------ */
2494
2495 static void
2496 scheduleThread_(StgTSO *tso)
2497 {
2498   // The thread goes at the *end* of the run-queue, to avoid possible
2499   // starvation of any threads already on the queue.
2500   APPEND_TO_RUN_QUEUE(tso);
2501   threadRunnable();
2502 }
2503
2504 void
2505 scheduleThread(StgTSO* tso)
2506 {
2507   ACQUIRE_LOCK(&sched_mutex);
2508   scheduleThread_(tso);
2509   RELEASE_LOCK(&sched_mutex);
2510 }
2511
2512 #if defined(RTS_SUPPORTS_THREADS)
2513 static Condition bound_cond_cache;
2514 static int bound_cond_cache_full = 0;
2515 #endif
2516
2517
2518 SchedulerStatus
2519 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
2520                    Capability *initialCapability)
2521 {
2522     // Precondition: sched_mutex must be held
2523     StgMainThread *m;
2524
2525     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2526     m->tso = tso;
2527     tso->main = m;
2528     m->ret = ret;
2529     m->stat = NoStatus;
2530     m->link = main_threads;
2531     m->prev = NULL;
2532     if (main_threads != NULL) {
2533         main_threads->prev = m;
2534     }
2535     main_threads = m;
2536
2537 #if defined(RTS_SUPPORTS_THREADS)
2538     // Allocating a new condition for each thread is expensive, so we
2539     // cache one.  This is a pretty feeble hack, but it helps speed up
2540     // consecutive call-ins quite a bit.
2541     if (bound_cond_cache_full) {
2542         m->bound_thread_cond = bound_cond_cache;
2543         bound_cond_cache_full = 0;
2544     } else {
2545         initCondition(&m->bound_thread_cond);
2546     }
2547 #endif
2548
2549     /* Put the thread on the main-threads list prior to scheduling the TSO.
2550        Failure to do so introduces a race condition in the MT case (as
2551        identified by Wolfgang Thaller), whereby the new task/OS thread 
2552        created by scheduleThread_() would complete prior to the thread
2553        that spawned it managed to put 'itself' on the main-threads list.
2554        The upshot of it all being that the worker thread wouldn't get to
2555        signal the completion of the its work item for the main thread to
2556        see (==> it got stuck waiting.)    -- sof 6/02.
2557     */
2558     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2559     
2560     APPEND_TO_RUN_QUEUE(tso);
2561     // NB. Don't call threadRunnable() here, because the thread is
2562     // bound and only runnable by *this* OS thread, so waking up other
2563     // workers will just slow things down.
2564
2565     return waitThread_(m, initialCapability);
2566 }
2567
2568 /* ---------------------------------------------------------------------------
2569  * initScheduler()
2570  *
2571  * Initialise the scheduler.  This resets all the queues - if the
2572  * queues contained any threads, they'll be garbage collected at the
2573  * next pass.
2574  *
2575  * ------------------------------------------------------------------------ */
2576
2577 void 
2578 initScheduler(void)
2579 {
2580 #if defined(GRAN)
2581   nat i;
2582
2583   for (i=0; i<=MAX_PROC; i++) {
2584     run_queue_hds[i]      = END_TSO_QUEUE;
2585     run_queue_tls[i]      = END_TSO_QUEUE;
2586     blocked_queue_hds[i]  = END_TSO_QUEUE;
2587     blocked_queue_tls[i]  = END_TSO_QUEUE;
2588     ccalling_threadss[i]  = END_TSO_QUEUE;
2589     blackhole_queue[i]    = END_TSO_QUEUE;
2590     sleeping_queue        = END_TSO_QUEUE;
2591   }
2592 #else
2593   run_queue_hd      = END_TSO_QUEUE;
2594   run_queue_tl      = END_TSO_QUEUE;
2595   blocked_queue_hd  = END_TSO_QUEUE;
2596   blocked_queue_tl  = END_TSO_QUEUE;
2597   blackhole_queue   = END_TSO_QUEUE;
2598   sleeping_queue    = END_TSO_QUEUE;
2599 #endif 
2600
2601   suspended_ccalling_threads  = END_TSO_QUEUE;
2602
2603   main_threads = NULL;
2604   all_threads  = END_TSO_QUEUE;
2605
2606   context_switch = 0;
2607   interrupted    = 0;
2608
2609   RtsFlags.ConcFlags.ctxtSwitchTicks =
2610       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2611       
2612 #if defined(RTS_SUPPORTS_THREADS)
2613   /* Initialise the mutex and condition variables used by
2614    * the scheduler. */
2615   initMutex(&sched_mutex);
2616   initMutex(&term_mutex);
2617 #endif
2618   
2619   ACQUIRE_LOCK(&sched_mutex);
2620
2621   /* A capability holds the state a native thread needs in
2622    * order to execute STG code. At least one capability is
2623    * floating around (only SMP builds have more than one).
2624    */
2625   initCapabilities();
2626   
2627 #if defined(RTS_SUPPORTS_THREADS)
2628   initTaskManager();
2629 #endif
2630
2631 #if defined(SMP)
2632   /* eagerly start some extra workers */
2633   startTasks(RtsFlags.ParFlags.nNodes, taskStart);
2634 #endif
2635
2636 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2637   initSparkPools();
2638 #endif
2639
2640   RELEASE_LOCK(&sched_mutex);
2641 }
2642
2643 void
2644 exitScheduler( void )
2645 {
2646     interrupted = rtsTrue;
2647     shutting_down_scheduler = rtsTrue;
2648 #if defined(RTS_SUPPORTS_THREADS)
2649     if (threadIsTask(osThreadId())) { taskStop(); }
2650     stopTaskManager();
2651 #endif
2652 }
2653
2654 /* ----------------------------------------------------------------------------
2655    Managing the per-task allocation areas.
2656    
2657    Each capability comes with an allocation area.  These are
2658    fixed-length block lists into which allocation can be done.
2659
2660    ToDo: no support for two-space collection at the moment???
2661    ------------------------------------------------------------------------- */
2662
2663 static SchedulerStatus
2664 waitThread_(StgMainThread* m, Capability *initialCapability)
2665 {
2666   SchedulerStatus stat;
2667
2668   // Precondition: sched_mutex must be held.
2669   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2670
2671 #if defined(GRAN)
2672   /* GranSim specific init */
2673   CurrentTSO = m->tso;                // the TSO to run
2674   procStatus[MainProc] = Busy;        // status of main PE
2675   CurrentProc = MainProc;             // PE to run it on
2676   schedule(m,initialCapability);
2677 #else
2678   schedule(m,initialCapability);
2679   ASSERT(m->stat != NoStatus);
2680 #endif
2681
2682   stat = m->stat;
2683
2684 #if defined(RTS_SUPPORTS_THREADS)
2685   // Free the condition variable, returning it to the cache if possible.
2686   if (!bound_cond_cache_full) {
2687       bound_cond_cache = m->bound_thread_cond;
2688       bound_cond_cache_full = 1;
2689   } else {
2690       closeCondition(&m->bound_thread_cond);
2691   }
2692 #endif
2693
2694   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2695   stgFree(m);
2696
2697   // Postcondition: sched_mutex still held
2698   return stat;
2699 }
2700
2701 /* ---------------------------------------------------------------------------
2702    Where are the roots that we know about?
2703
2704         - all the threads on the runnable queue
2705         - all the threads on the blocked queue
2706         - all the threads on the sleeping queue
2707         - all the thread currently executing a _ccall_GC
2708         - all the "main threads"
2709      
2710    ------------------------------------------------------------------------ */
2711
2712 /* This has to be protected either by the scheduler monitor, or by the
2713         garbage collection monitor (probably the latter).
2714         KH @ 25/10/99
2715 */
2716
2717 void
2718 GetRoots( evac_fn evac )
2719 {
2720 #if defined(GRAN)
2721   {
2722     nat i;
2723     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2724       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2725           evac((StgClosure **)&run_queue_hds[i]);
2726       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2727           evac((StgClosure **)&run_queue_tls[i]);
2728       
2729       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2730           evac((StgClosure **)&blocked_queue_hds[i]);
2731       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2732           evac((StgClosure **)&blocked_queue_tls[i]);
2733       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2734           evac((StgClosure **)&ccalling_threads[i]);
2735     }
2736   }
2737
2738   markEventQueue();
2739
2740 #else /* !GRAN */
2741   if (run_queue_hd != END_TSO_QUEUE) {
2742       ASSERT(run_queue_tl != END_TSO_QUEUE);
2743       evac((StgClosure **)&run_queue_hd);
2744       evac((StgClosure **)&run_queue_tl);
2745   }
2746   
2747   if (blocked_queue_hd != END_TSO_QUEUE) {
2748       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2749       evac((StgClosure **)&blocked_queue_hd);
2750       evac((StgClosure **)&blocked_queue_tl);
2751   }
2752   
2753   if (sleeping_queue != END_TSO_QUEUE) {
2754       evac((StgClosure **)&sleeping_queue);
2755   }
2756 #endif 
2757
2758   if (blackhole_queue != END_TSO_QUEUE) {
2759       evac((StgClosure **)&blackhole_queue);
2760   }
2761
2762   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2763       evac((StgClosure **)&suspended_ccalling_threads);
2764   }
2765
2766 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2767   markSparkQueue(evac);
2768 #endif
2769
2770 #if defined(RTS_USER_SIGNALS)
2771   // mark the signal handlers (signals should be already blocked)
2772   markSignalHandlers(evac);
2773 #endif
2774 }
2775
2776 /* -----------------------------------------------------------------------------
2777    performGC
2778
2779    This is the interface to the garbage collector from Haskell land.
2780    We provide this so that external C code can allocate and garbage
2781    collect when called from Haskell via _ccall_GC.
2782
2783    It might be useful to provide an interface whereby the programmer
2784    can specify more roots (ToDo).
2785    
2786    This needs to be protected by the GC condition variable above.  KH.
2787    -------------------------------------------------------------------------- */
2788
2789 static void (*extra_roots)(evac_fn);
2790
2791 void
2792 performGC(void)
2793 {
2794   /* Obligated to hold this lock upon entry */
2795   ACQUIRE_LOCK(&sched_mutex);
2796   GarbageCollect(GetRoots,rtsFalse);
2797   RELEASE_LOCK(&sched_mutex);
2798 }
2799
2800 void
2801 performMajorGC(void)
2802 {
2803   ACQUIRE_LOCK(&sched_mutex);
2804   GarbageCollect(GetRoots,rtsTrue);
2805   RELEASE_LOCK(&sched_mutex);
2806 }
2807
2808 static void
2809 AllRoots(evac_fn evac)
2810 {
2811     GetRoots(evac);             // the scheduler's roots
2812     extra_roots(evac);          // the user's roots
2813 }
2814
2815 void
2816 performGCWithRoots(void (*get_roots)(evac_fn))
2817 {
2818   ACQUIRE_LOCK(&sched_mutex);
2819   extra_roots = get_roots;
2820   GarbageCollect(AllRoots,rtsFalse);
2821   RELEASE_LOCK(&sched_mutex);
2822 }
2823
2824 /* -----------------------------------------------------------------------------
2825    Stack overflow
2826
2827    If the thread has reached its maximum stack size, then raise the
2828    StackOverflow exception in the offending thread.  Otherwise
2829    relocate the TSO into a larger chunk of memory and adjust its stack
2830    size appropriately.
2831    -------------------------------------------------------------------------- */
2832
2833 static StgTSO *
2834 threadStackOverflow(StgTSO *tso)
2835 {
2836   nat new_stack_size, stack_words;
2837   lnat new_tso_size;
2838   StgPtr new_sp;
2839   StgTSO *dest;
2840
2841   IF_DEBUG(sanity,checkTSO(tso));
2842   if (tso->stack_size >= tso->max_stack_size) {
2843
2844     IF_DEBUG(gc,
2845              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2846                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2847              /* If we're debugging, just print out the top of the stack */
2848              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2849                                               tso->sp+64)));
2850
2851     /* Send this thread the StackOverflow exception */
2852     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2853     return tso;
2854   }
2855
2856   /* Try to double the current stack size.  If that takes us over the
2857    * maximum stack size for this thread, then use the maximum instead.
2858    * Finally round up so the TSO ends up as a whole number of blocks.
2859    */
2860   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2861   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2862                                        TSO_STRUCT_SIZE)/sizeof(W_);
2863   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2864   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2865
2866   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2867
2868   dest = (StgTSO *)allocate(new_tso_size);
2869   TICK_ALLOC_TSO(new_stack_size,0);
2870
2871   /* copy the TSO block and the old stack into the new area */
2872   memcpy(dest,tso,TSO_STRUCT_SIZE);
2873   stack_words = tso->stack + tso->stack_size - tso->sp;
2874   new_sp = (P_)dest + new_tso_size - stack_words;
2875   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2876
2877   /* relocate the stack pointers... */
2878   dest->sp         = new_sp;
2879   dest->stack_size = new_stack_size;
2880         
2881   /* Mark the old TSO as relocated.  We have to check for relocated
2882    * TSOs in the garbage collector and any primops that deal with TSOs.
2883    *
2884    * It's important to set the sp value to just beyond the end
2885    * of the stack, so we don't attempt to scavenge any part of the
2886    * dead TSO's stack.
2887    */
2888   tso->what_next = ThreadRelocated;
2889   tso->link = dest;
2890   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2891   tso->why_blocked = NotBlocked;
2892
2893   IF_PAR_DEBUG(verbose,
2894                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2895                      tso->id, tso, tso->stack_size);
2896                /* If we're debugging, just print out the top of the stack */
2897                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2898                                                 tso->sp+64)));
2899   
2900   IF_DEBUG(sanity,checkTSO(tso));
2901 #if 0
2902   IF_DEBUG(scheduler,printTSO(dest));
2903 #endif
2904
2905   return dest;
2906 }
2907
2908 /* ---------------------------------------------------------------------------
2909    Wake up a queue that was blocked on some resource.
2910    ------------------------------------------------------------------------ */
2911
2912 #if defined(GRAN)
2913 STATIC_INLINE void
2914 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2915 {
2916 }
2917 #elif defined(PARALLEL_HASKELL)
2918 STATIC_INLINE void
2919 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2920 {
2921   /* write RESUME events to log file and
2922      update blocked and fetch time (depending on type of the orig closure) */
2923   if (RtsFlags.ParFlags.ParStats.Full) {
2924     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2925                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2926                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2927     if (EMPTY_RUN_QUEUE())
2928       emitSchedule = rtsTrue;
2929
2930     switch (get_itbl(node)->type) {
2931         case FETCH_ME_BQ:
2932           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2933           break;
2934         case RBH:
2935         case FETCH_ME:
2936         case BLACKHOLE_BQ:
2937           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2938           break;
2939 #ifdef DIST
2940         case MVAR:
2941           break;
2942 #endif    
2943         default:
2944           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2945         }
2946       }
2947 }
2948 #endif
2949
2950 #if defined(GRAN)
2951 static StgBlockingQueueElement *
2952 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2953 {
2954     StgTSO *tso;
2955     PEs node_loc, tso_loc;
2956
2957     node_loc = where_is(node); // should be lifted out of loop
2958     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2959     tso_loc = where_is((StgClosure *)tso);
2960     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2961       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2962       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2963       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2964       // insertThread(tso, node_loc);
2965       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2966                 ResumeThread,
2967                 tso, node, (rtsSpark*)NULL);
2968       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2969       // len_local++;
2970       // len++;
2971     } else { // TSO is remote (actually should be FMBQ)
2972       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2973                                   RtsFlags.GranFlags.Costs.gunblocktime +
2974                                   RtsFlags.GranFlags.Costs.latency;
2975       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2976                 UnblockThread,
2977                 tso, node, (rtsSpark*)NULL);
2978       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2979       // len++;
2980     }
2981     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2982     IF_GRAN_DEBUG(bq,
2983                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2984                           (node_loc==tso_loc ? "Local" : "Global"), 
2985                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2986     tso->block_info.closure = NULL;
2987     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2988                              tso->id, tso));
2989 }
2990 #elif defined(PARALLEL_HASKELL)
2991 static StgBlockingQueueElement *
2992 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2993 {
2994     StgBlockingQueueElement *next;
2995
2996     switch (get_itbl(bqe)->type) {
2997     case TSO:
2998       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2999       /* if it's a TSO just push it onto the run_queue */
3000       next = bqe->link;
3001       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3002       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3003       threadRunnable();
3004       unblockCount(bqe, node);
3005       /* reset blocking status after dumping event */
3006       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3007       break;
3008
3009     case BLOCKED_FETCH:
3010       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3011       next = bqe->link;
3012       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3013       PendingFetches = (StgBlockedFetch *)bqe;
3014       break;
3015
3016 # if defined(DEBUG)
3017       /* can ignore this case in a non-debugging setup; 
3018          see comments on RBHSave closures above */
3019     case CONSTR:
3020       /* check that the closure is an RBHSave closure */
3021       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3022              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3023              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3024       break;
3025
3026     default:
3027       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3028            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3029            (StgClosure *)bqe);
3030 # endif
3031     }
3032   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3033   return next;
3034 }
3035
3036 #else /* !GRAN && !PARALLEL_HASKELL */
3037 static StgTSO *
3038 unblockOneLocked(StgTSO *tso)
3039 {
3040   StgTSO *next;
3041
3042   ASSERT(get_itbl(tso)->type == TSO);
3043   ASSERT(tso->why_blocked != NotBlocked);
3044   tso->why_blocked = NotBlocked;
3045   next = tso->link;
3046   tso->link = END_TSO_QUEUE;
3047   APPEND_TO_RUN_QUEUE(tso);
3048   threadRunnable();
3049   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3050   return next;
3051 }
3052 #endif
3053
3054 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3055 INLINE_ME StgBlockingQueueElement *
3056 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3057 {
3058   ACQUIRE_LOCK(&sched_mutex);
3059   bqe = unblockOneLocked(bqe, node);
3060   RELEASE_LOCK(&sched_mutex);
3061   return bqe;
3062 }
3063 #else
3064 INLINE_ME StgTSO *
3065 unblockOne(StgTSO *tso)
3066 {
3067   ACQUIRE_LOCK(&sched_mutex);
3068   tso = unblockOneLocked(tso);
3069   RELEASE_LOCK(&sched_mutex);
3070   return tso;
3071 }
3072 #endif
3073
3074 #if defined(GRAN)
3075 void 
3076 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3077 {
3078   StgBlockingQueueElement *bqe;
3079   PEs node_loc;
3080   nat len = 0; 
3081
3082   IF_GRAN_DEBUG(bq, 
3083                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3084                       node, CurrentProc, CurrentTime[CurrentProc], 
3085                       CurrentTSO->id, CurrentTSO));
3086
3087   node_loc = where_is(node);
3088
3089   ASSERT(q == END_BQ_QUEUE ||
3090          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3091          get_itbl(q)->type == CONSTR); // closure (type constructor)
3092   ASSERT(is_unique(node));
3093
3094   /* FAKE FETCH: magically copy the node to the tso's proc;
3095      no Fetch necessary because in reality the node should not have been 
3096      moved to the other PE in the first place
3097   */
3098   if (CurrentProc!=node_loc) {
3099     IF_GRAN_DEBUG(bq, 
3100                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3101                         node, node_loc, CurrentProc, CurrentTSO->id, 
3102                         // CurrentTSO, where_is(CurrentTSO),
3103                         node->header.gran.procs));
3104     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3105     IF_GRAN_DEBUG(bq, 
3106                   debugBelch("## new bitmask of node %p is %#x\n",
3107                         node, node->header.gran.procs));
3108     if (RtsFlags.GranFlags.GranSimStats.Global) {
3109       globalGranStats.tot_fake_fetches++;
3110     }
3111   }
3112
3113   bqe = q;
3114   // ToDo: check: ASSERT(CurrentProc==node_loc);
3115   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3116     //next = bqe->link;
3117     /* 
3118        bqe points to the current element in the queue
3119        next points to the next element in the queue
3120     */
3121     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3122     //tso_loc = where_is(tso);
3123     len++;
3124     bqe = unblockOneLocked(bqe, node);
3125   }
3126
3127   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3128      the closure to make room for the anchor of the BQ */
3129   if (bqe!=END_BQ_QUEUE) {
3130     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3131     /*
3132     ASSERT((info_ptr==&RBH_Save_0_info) ||
3133            (info_ptr==&RBH_Save_1_info) ||
3134            (info_ptr==&RBH_Save_2_info));
3135     */
3136     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3137     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3138     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3139
3140     IF_GRAN_DEBUG(bq,
3141                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3142                         node, info_type(node)));
3143   }
3144
3145   /* statistics gathering */
3146   if (RtsFlags.GranFlags.GranSimStats.Global) {
3147     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3148     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3149     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3150     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3151   }
3152   IF_GRAN_DEBUG(bq,
3153                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3154                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3155 }
3156 #elif defined(PARALLEL_HASKELL)
3157 void 
3158 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3159 {
3160   StgBlockingQueueElement *bqe;
3161
3162   ACQUIRE_LOCK(&sched_mutex);
3163
3164   IF_PAR_DEBUG(verbose, 
3165                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3166                      node, mytid));
3167 #ifdef DIST  
3168   //RFP
3169   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3170     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3171     return;
3172   }
3173 #endif
3174   
3175   ASSERT(q == END_BQ_QUEUE ||
3176          get_itbl(q)->type == TSO ||           
3177          get_itbl(q)->type == BLOCKED_FETCH || 
3178          get_itbl(q)->type == CONSTR); 
3179
3180   bqe = q;
3181   while (get_itbl(bqe)->type==TSO || 
3182          get_itbl(bqe)->type==BLOCKED_FETCH) {
3183     bqe = unblockOneLocked(bqe, node);
3184   }
3185   RELEASE_LOCK(&sched_mutex);
3186 }
3187
3188 #else   /* !GRAN && !PARALLEL_HASKELL */
3189
3190 void
3191 awakenBlockedQueueNoLock(StgTSO *tso)
3192 {
3193   while (tso != END_TSO_QUEUE) {
3194     tso = unblockOneLocked(tso);
3195   }
3196 }
3197
3198 void
3199 awakenBlockedQueue(StgTSO *tso)
3200 {
3201   ACQUIRE_LOCK(&sched_mutex);
3202   while (tso != END_TSO_QUEUE) {
3203     tso = unblockOneLocked(tso);
3204   }
3205   RELEASE_LOCK(&sched_mutex);
3206 }
3207 #endif
3208
3209 /* ---------------------------------------------------------------------------
3210    Interrupt execution
3211    - usually called inside a signal handler so it mustn't do anything fancy.   
3212    ------------------------------------------------------------------------ */
3213
3214 void
3215 interruptStgRts(void)
3216 {
3217     interrupted    = 1;
3218     context_switch = 1;
3219 }
3220
3221 /* -----------------------------------------------------------------------------
3222    Unblock a thread
3223
3224    This is for use when we raise an exception in another thread, which
3225    may be blocked.
3226    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3227    -------------------------------------------------------------------------- */
3228
3229 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3230 /*
3231   NB: only the type of the blocking queue is different in GranSim and GUM
3232       the operations on the queue-elements are the same
3233       long live polymorphism!
3234
3235   Locks: sched_mutex is held upon entry and exit.
3236
3237 */
3238 static void
3239 unblockThread(StgTSO *tso)
3240 {
3241   StgBlockingQueueElement *t, **last;
3242
3243   switch (tso->why_blocked) {
3244
3245   case NotBlocked:
3246     return;  /* not blocked */
3247
3248   case BlockedOnSTM:
3249     // Be careful: nothing to do here!  We tell the scheduler that the thread
3250     // is runnable and we leave it to the stack-walking code to abort the 
3251     // transaction while unwinding the stack.  We should perhaps have a debugging
3252     // test to make sure that this really happens and that the 'zombie' transaction
3253     // does not get committed.
3254     goto done;
3255
3256   case BlockedOnMVar:
3257     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3258     {
3259       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3260       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3261
3262       last = (StgBlockingQueueElement **)&mvar->head;
3263       for (t = (StgBlockingQueueElement *)mvar->head; 
3264            t != END_BQ_QUEUE; 
3265            last = &t->link, last_tso = t, t = t->link) {
3266         if (t == (StgBlockingQueueElement *)tso) {
3267           *last = (StgBlockingQueueElement *)tso->link;
3268           if (mvar->tail == tso) {
3269             mvar->tail = (StgTSO *)last_tso;
3270           }
3271           goto done;
3272         }
3273       }
3274       barf("unblockThread (MVAR): TSO not found");
3275     }
3276
3277   case BlockedOnBlackHole:
3278     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3279     {
3280       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3281
3282       last = &bq->blocking_queue;
3283       for (t = bq->blocking_queue; 
3284            t != END_BQ_QUEUE; 
3285            last = &t->link, t = t->link) {
3286         if (t == (StgBlockingQueueElement *)tso) {
3287           *last = (StgBlockingQueueElement *)tso->link;
3288           goto done;
3289         }
3290       }
3291       barf("unblockThread (BLACKHOLE): TSO not found");
3292     }
3293
3294   case BlockedOnException:
3295     {
3296       StgTSO *target  = tso->block_info.tso;
3297
3298       ASSERT(get_itbl(target)->type == TSO);
3299
3300       if (target->what_next == ThreadRelocated) {
3301           target = target->link;
3302           ASSERT(get_itbl(target)->type == TSO);
3303       }
3304
3305       ASSERT(target->blocked_exceptions != NULL);
3306
3307       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3308       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3309            t != END_BQ_QUEUE; 
3310            last = &t->link, t = t->link) {
3311         ASSERT(get_itbl(t)->type == TSO);
3312         if (t == (StgBlockingQueueElement *)tso) {
3313           *last = (StgBlockingQueueElement *)tso->link;
3314           goto done;
3315         }
3316       }
3317       barf("unblockThread (Exception): TSO not found");
3318     }
3319
3320   case BlockedOnRead:
3321   case BlockedOnWrite:
3322 #if defined(mingw32_HOST_OS)
3323   case BlockedOnDoProc:
3324 #endif
3325     {
3326       /* take TSO off blocked_queue */
3327       StgBlockingQueueElement *prev = NULL;
3328       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3329            prev = t, t = t->link) {
3330         if (t == (StgBlockingQueueElement *)tso) {
3331           if (prev == NULL) {
3332             blocked_queue_hd = (StgTSO *)t->link;
3333             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3334               blocked_queue_tl = END_TSO_QUEUE;
3335             }
3336           } else {
3337             prev->link = t->link;
3338             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3339               blocked_queue_tl = (StgTSO *)prev;
3340             }
3341           }
3342           goto done;
3343         }
3344       }
3345       barf("unblockThread (I/O): TSO not found");
3346     }
3347
3348   case BlockedOnDelay:
3349     {
3350       /* take TSO off sleeping_queue */
3351       StgBlockingQueueElement *prev = NULL;
3352       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3353            prev = t, t = t->link) {
3354         if (t == (StgBlockingQueueElement *)tso) {
3355           if (prev == NULL) {
3356             sleeping_queue = (StgTSO *)t->link;
3357           } else {
3358             prev->link = t->link;
3359           }
3360           goto done;
3361         }
3362       }
3363       barf("unblockThread (delay): TSO not found");
3364     }
3365
3366   default:
3367     barf("unblockThread");
3368   }
3369
3370  done:
3371   tso->link = END_TSO_QUEUE;
3372   tso->why_blocked = NotBlocked;
3373   tso->block_info.closure = NULL;
3374   PUSH_ON_RUN_QUEUE(tso);
3375 }
3376 #else
3377 static void
3378 unblockThread(StgTSO *tso)
3379 {
3380   StgTSO *t, **last;
3381   
3382   /* To avoid locking unnecessarily. */
3383   if (tso->why_blocked == NotBlocked) {
3384     return;
3385   }
3386
3387   switch (tso->why_blocked) {
3388
3389   case BlockedOnSTM:
3390     // Be careful: nothing to do here!  We tell the scheduler that the thread
3391     // is runnable and we leave it to the stack-walking code to abort the 
3392     // transaction while unwinding the stack.  We should perhaps have a debugging
3393     // test to make sure that this really happens and that the 'zombie' transaction
3394     // does not get committed.
3395     goto done;
3396
3397   case BlockedOnMVar:
3398     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3399     {
3400       StgTSO *last_tso = END_TSO_QUEUE;
3401       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3402
3403       last = &mvar->head;
3404       for (t = mvar->head; t != END_TSO_QUEUE; 
3405            last = &t->link, last_tso = t, t = t->link) {
3406         if (t == tso) {
3407           *last = tso->link;
3408           if (mvar->tail == tso) {
3409             mvar->tail = last_tso;
3410           }
3411           goto done;
3412         }
3413       }
3414       barf("unblockThread (MVAR): TSO not found");
3415     }
3416
3417   case BlockedOnBlackHole:
3418     {
3419       last = &blackhole_queue;
3420       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3421            last = &t->link, t = t->link) {
3422         if (t == tso) {
3423           *last = tso->link;
3424           goto done;
3425         }
3426       }
3427       barf("unblockThread (BLACKHOLE): TSO not found");
3428     }
3429
3430   case BlockedOnException:
3431     {
3432       StgTSO *target  = tso->block_info.tso;
3433
3434       ASSERT(get_itbl(target)->type == TSO);
3435
3436       while (target->what_next == ThreadRelocated) {
3437           target = target->link;
3438           ASSERT(get_itbl(target)->type == TSO);
3439       }
3440       
3441       ASSERT(target->blocked_exceptions != NULL);
3442
3443       last = &target->blocked_exceptions;
3444       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3445            last = &t->link, t = t->link) {
3446         ASSERT(get_itbl(t)->type == TSO);
3447         if (t == tso) {
3448           *last = tso->link;
3449           goto done;
3450         }
3451       }
3452       barf("unblockThread (Exception): TSO not found");
3453     }
3454
3455   case BlockedOnRead:
3456   case BlockedOnWrite:
3457 #if defined(mingw32_HOST_OS)
3458   case BlockedOnDoProc:
3459 #endif
3460     {
3461       StgTSO *prev = NULL;
3462       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3463            prev = t, t = t->link) {
3464         if (t == tso) {
3465           if (prev == NULL) {
3466             blocked_queue_hd = t->link;
3467             if (blocked_queue_tl == t) {
3468               blocked_queue_tl = END_TSO_QUEUE;
3469             }
3470           } else {
3471             prev->link = t->link;
3472             if (blocked_queue_tl == t) {
3473               blocked_queue_tl = prev;
3474             }
3475           }
3476           goto done;
3477         }
3478       }
3479       barf("unblockThread (I/O): TSO not found");
3480     }
3481
3482   case BlockedOnDelay:
3483     {
3484       StgTSO *prev = NULL;
3485       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3486            prev = t, t = t->link) {
3487         if (t == tso) {
3488           if (prev == NULL) {
3489             sleeping_queue = t->link;
3490           } else {
3491             prev->link = t->link;
3492           }
3493           goto done;
3494         }
3495       }
3496       barf("unblockThread (delay): TSO not found");
3497     }
3498
3499   default:
3500     barf("unblockThread");
3501   }
3502
3503  done:
3504   tso->link = END_TSO_QUEUE;
3505   tso->why_blocked = NotBlocked;
3506   tso->block_info.closure = NULL;
3507   APPEND_TO_RUN_QUEUE(tso);
3508 }
3509 #endif
3510
3511 /* -----------------------------------------------------------------------------
3512  * checkBlackHoles()
3513  *
3514  * Check the blackhole_queue for threads that can be woken up.  We do
3515  * this periodically: before every GC, and whenever the run queue is
3516  * empty.
3517  *
3518  * An elegant solution might be to just wake up all the blocked
3519  * threads with awakenBlockedQueue occasionally: they'll go back to
3520  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3521  * doesn't give us a way to tell whether we've actually managed to
3522  * wake up any threads, so we would be busy-waiting.
3523  *
3524  * -------------------------------------------------------------------------- */
3525
3526 static rtsBool
3527 checkBlackHoles( void )
3528 {
3529     StgTSO **prev, *t;
3530     rtsBool any_woke_up = rtsFalse;
3531     StgHalfWord type;
3532
3533     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3534
3535     // ASSUMES: sched_mutex
3536     prev = &blackhole_queue;
3537     t = blackhole_queue;
3538     while (t != END_TSO_QUEUE) {
3539         ASSERT(t->why_blocked == BlockedOnBlackHole);
3540         type = get_itbl(t->block_info.closure)->type;
3541         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3542             t = unblockOneLocked(t);
3543             *prev = t;
3544             any_woke_up = rtsTrue;
3545         } else {
3546             prev = &t->link;
3547             t = t->link;
3548         }
3549     }
3550
3551     return any_woke_up;
3552 }
3553
3554 /* -----------------------------------------------------------------------------
3555  * raiseAsync()
3556  *
3557  * The following function implements the magic for raising an
3558  * asynchronous exception in an existing thread.
3559  *
3560  * We first remove the thread from any queue on which it might be
3561  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3562  *
3563  * We strip the stack down to the innermost CATCH_FRAME, building
3564  * thunks in the heap for all the active computations, so they can 
3565  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3566  * an application of the handler to the exception, and push it on
3567  * the top of the stack.
3568  * 
3569  * How exactly do we save all the active computations?  We create an
3570  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3571  * AP_STACKs pushes everything from the corresponding update frame
3572  * upwards onto the stack.  (Actually, it pushes everything up to the
3573  * next update frame plus a pointer to the next AP_STACK object.
3574  * Entering the next AP_STACK object pushes more onto the stack until we
3575  * reach the last AP_STACK object - at which point the stack should look
3576  * exactly as it did when we killed the TSO and we can continue
3577  * execution by entering the closure on top of the stack.
3578  *
3579  * We can also kill a thread entirely - this happens if either (a) the 
3580  * exception passed to raiseAsync is NULL, or (b) there's no
3581  * CATCH_FRAME on the stack.  In either case, we strip the entire
3582  * stack and replace the thread with a zombie.
3583  *
3584  * Locks: sched_mutex held upon entry nor exit.
3585  *
3586  * -------------------------------------------------------------------------- */
3587  
3588 void 
3589 deleteThread(StgTSO *tso)
3590 {
3591   if (tso->why_blocked != BlockedOnCCall &&
3592       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3593       raiseAsync(tso,NULL);
3594   }
3595 }
3596
3597 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3598 static void 
3599 deleteThreadImmediately(StgTSO *tso)
3600 { // for forkProcess only:
3601   // delete thread without giving it a chance to catch the KillThread exception
3602
3603   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3604       return;
3605   }
3606
3607   if (tso->why_blocked != BlockedOnCCall &&
3608       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3609     unblockThread(tso);
3610   }
3611
3612   tso->what_next = ThreadKilled;
3613 }
3614 #endif
3615
3616 void
3617 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3618 {
3619   /* When raising async exs from contexts where sched_mutex isn't held;
3620      use raiseAsyncWithLock(). */
3621   ACQUIRE_LOCK(&sched_mutex);
3622   raiseAsync(tso,exception);
3623   RELEASE_LOCK(&sched_mutex);
3624 }
3625
3626 void
3627 raiseAsync(StgTSO *tso, StgClosure *exception)
3628 {
3629     raiseAsync_(tso, exception, rtsFalse);
3630 }
3631
3632 static void
3633 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3634 {
3635     StgRetInfoTable *info;
3636     StgPtr sp;
3637   
3638     // Thread already dead?
3639     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3640         return;
3641     }
3642
3643     IF_DEBUG(scheduler, 
3644              sched_belch("raising exception in thread %ld.", (long)tso->id));
3645     
3646     // Remove it from any blocking queues
3647     unblockThread(tso);
3648
3649     sp = tso->sp;
3650     
3651     // The stack freezing code assumes there's a closure pointer on
3652     // the top of the stack, so we have to arrange that this is the case...
3653     //
3654     if (sp[0] == (W_)&stg_enter_info) {
3655         sp++;
3656     } else {
3657         sp--;
3658         sp[0] = (W_)&stg_dummy_ret_closure;
3659     }
3660
3661     while (1) {
3662         nat i;
3663
3664         // 1. Let the top of the stack be the "current closure"
3665         //
3666         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3667         // CATCH_FRAME.
3668         //
3669         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3670         // current closure applied to the chunk of stack up to (but not
3671         // including) the update frame.  This closure becomes the "current
3672         // closure".  Go back to step 2.
3673         //
3674         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3675         // top of the stack applied to the exception.
3676         // 
3677         // 5. If it's a STOP_FRAME, then kill the thread.
3678         // 
3679         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3680         // transaction
3681        
3682         
3683         StgPtr frame;
3684         
3685         frame = sp + 1;
3686         info = get_ret_itbl((StgClosure *)frame);
3687         
3688         while (info->i.type != UPDATE_FRAME
3689                && (info->i.type != CATCH_FRAME || exception == NULL)
3690                && info->i.type != STOP_FRAME
3691                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3692         {
3693             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3694               // IF we find an ATOMICALLY_FRAME then we abort the
3695               // current transaction and propagate the exception.  In
3696               // this case (unlike ordinary exceptions) we do not care
3697               // whether the transaction is valid or not because its
3698               // possible validity cannot have caused the exception
3699               // and will not be visible after the abort.
3700               IF_DEBUG(stm,
3701                        debugBelch("Found atomically block delivering async exception\n"));
3702               stmAbortTransaction(tso -> trec);
3703               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3704             }
3705             frame += stack_frame_sizeW((StgClosure *)frame);
3706             info = get_ret_itbl((StgClosure *)frame);
3707         }
3708         
3709         switch (info->i.type) {
3710             
3711         case ATOMICALLY_FRAME:
3712             ASSERT(stop_at_atomically);
3713             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3714             stmCondemnTransaction(tso -> trec);
3715 #ifdef REG_R1
3716             tso->sp = frame;
3717 #else
3718             // R1 is not a register: the return convention for IO in
3719             // this case puts the return value on the stack, so we
3720             // need to set up the stack to return to the atomically
3721             // frame properly...
3722             tso->sp = frame - 2;
3723             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3724             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3725 #endif
3726             tso->what_next = ThreadRunGHC;
3727             return;
3728
3729         case CATCH_FRAME:
3730             // If we find a CATCH_FRAME, and we've got an exception to raise,
3731             // then build the THUNK raise(exception), and leave it on
3732             // top of the CATCH_FRAME ready to enter.
3733             //
3734         {
3735 #ifdef PROFILING
3736             StgCatchFrame *cf = (StgCatchFrame *)frame;
3737 #endif
3738             StgClosure *raise;
3739             
3740             // we've got an exception to raise, so let's pass it to the
3741             // handler in this frame.
3742             //
3743             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3744             TICK_ALLOC_SE_THK(1,0);
3745             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3746             raise->payload[0] = exception;
3747             
3748             // throw away the stack from Sp up to the CATCH_FRAME.
3749             //
3750             sp = frame - 1;
3751             
3752             /* Ensure that async excpetions are blocked now, so we don't get
3753              * a surprise exception before we get around to executing the
3754              * handler.
3755              */
3756             if (tso->blocked_exceptions == NULL) {
3757                 tso->blocked_exceptions = END_TSO_QUEUE;
3758             }
3759             
3760             /* Put the newly-built THUNK on top of the stack, ready to execute
3761              * when the thread restarts.
3762              */
3763             sp[0] = (W_)raise;
3764             sp[-1] = (W_)&stg_enter_info;
3765             tso->sp = sp-1;
3766             tso->what_next = ThreadRunGHC;
3767             IF_DEBUG(sanity, checkTSO(tso));
3768             return;
3769         }
3770         
3771         case UPDATE_FRAME:
3772         {
3773             StgAP_STACK * ap;
3774             nat words;
3775             
3776             // First build an AP_STACK consisting of the stack chunk above the
3777             // current update frame, with the top word on the stack as the
3778             // fun field.
3779             //
3780             words = frame - sp - 1;
3781             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3782             
3783             ap->size = words;
3784             ap->fun  = (StgClosure *)sp[0];
3785             sp++;
3786             for(i=0; i < (nat)words; ++i) {
3787                 ap->payload[i] = (StgClosure *)*sp++;
3788             }
3789             
3790             SET_HDR(ap,&stg_AP_STACK_info,
3791                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3792             TICK_ALLOC_UP_THK(words+1,0);
3793             
3794             IF_DEBUG(scheduler,
3795                      debugBelch("sched: Updating ");
3796                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3797                      debugBelch(" with ");
3798                      printObj((StgClosure *)ap);
3799                 );
3800
3801             // Replace the updatee with an indirection - happily
3802             // this will also wake up any threads currently
3803             // waiting on the result.
3804             //
3805             // Warning: if we're in a loop, more than one update frame on
3806             // the stack may point to the same object.  Be careful not to
3807             // overwrite an IND_OLDGEN in this case, because we'll screw
3808             // up the mutable lists.  To be on the safe side, don't
3809             // overwrite any kind of indirection at all.  See also
3810             // threadSqueezeStack in GC.c, where we have to make a similar
3811             // check.
3812             //
3813             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3814                 // revert the black hole
3815                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3816                                (StgClosure *)ap);
3817             }
3818             sp += sizeofW(StgUpdateFrame) - 1;
3819             sp[0] = (W_)ap; // push onto stack
3820             break;
3821         }
3822         
3823         case STOP_FRAME:
3824             // We've stripped the entire stack, the thread is now dead.
3825             sp += sizeofW(StgStopFrame);
3826             tso->what_next = ThreadKilled;
3827             tso->sp = sp;
3828             return;
3829             
3830         default:
3831             barf("raiseAsync");
3832         }
3833     }
3834     barf("raiseAsync");
3835 }
3836
3837 /* -----------------------------------------------------------------------------
3838    raiseExceptionHelper
3839    
3840    This function is called by the raise# primitve, just so that we can
3841    move some of the tricky bits of raising an exception from C-- into
3842    C.  Who knows, it might be a useful re-useable thing here too.
3843    -------------------------------------------------------------------------- */
3844
3845 StgWord
3846 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3847 {
3848     StgClosure *raise_closure = NULL;
3849     StgPtr p, next;
3850     StgRetInfoTable *info;
3851     //
3852     // This closure represents the expression 'raise# E' where E
3853     // is the exception raise.  It is used to overwrite all the
3854     // thunks which are currently under evaluataion.
3855     //
3856
3857     //    
3858     // LDV profiling: stg_raise_info has THUNK as its closure
3859     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3860     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3861     // 1 does not cause any problem unless profiling is performed.
3862     // However, when LDV profiling goes on, we need to linearly scan
3863     // small object pool, where raise_closure is stored, so we should
3864     // use MIN_UPD_SIZE.
3865     //
3866     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3867     //                                 sizeofW(StgClosure)+1);
3868     //
3869
3870     //
3871     // Walk up the stack, looking for the catch frame.  On the way,
3872     // we update any closures pointed to from update frames with the
3873     // raise closure that we just built.
3874     //
3875     p = tso->sp;
3876     while(1) {
3877         info = get_ret_itbl((StgClosure *)p);
3878         next = p + stack_frame_sizeW((StgClosure *)p);
3879         switch (info->i.type) {
3880             
3881         case UPDATE_FRAME:
3882             // Only create raise_closure if we need to.
3883             if (raise_closure == NULL) {
3884                 raise_closure = 
3885                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3886                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3887                 raise_closure->payload[0] = exception;
3888             }
3889             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3890             p = next;
3891             continue;
3892
3893         case ATOMICALLY_FRAME:
3894             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3895             tso->sp = p;
3896             return ATOMICALLY_FRAME;
3897             
3898         case CATCH_FRAME:
3899             tso->sp = p;
3900             return CATCH_FRAME;
3901
3902         case CATCH_STM_FRAME:
3903             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3904             tso->sp = p;
3905             return CATCH_STM_FRAME;
3906             
3907         case STOP_FRAME:
3908             tso->sp = p;
3909             return STOP_FRAME;
3910
3911         case CATCH_RETRY_FRAME:
3912         default:
3913             p = next; 
3914             continue;
3915         }
3916     }
3917 }
3918
3919
3920 /* -----------------------------------------------------------------------------
3921    findRetryFrameHelper
3922
3923    This function is called by the retry# primitive.  It traverses the stack
3924    leaving tso->sp referring to the frame which should handle the retry.  
3925
3926    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3927    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3928
3929    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3930    despite the similar implementation.
3931
3932    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3933    not be created within memory transactions.
3934    -------------------------------------------------------------------------- */
3935
3936 StgWord
3937 findRetryFrameHelper (StgTSO *tso)
3938 {
3939   StgPtr           p, next;
3940   StgRetInfoTable *info;
3941
3942   p = tso -> sp;
3943   while (1) {
3944     info = get_ret_itbl((StgClosure *)p);
3945     next = p + stack_frame_sizeW((StgClosure *)p);
3946     switch (info->i.type) {
3947       
3948     case ATOMICALLY_FRAME:
3949       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3950       tso->sp = p;
3951       return ATOMICALLY_FRAME;
3952       
3953     case CATCH_RETRY_FRAME:
3954       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3955       tso->sp = p;
3956       return CATCH_RETRY_FRAME;
3957       
3958     case CATCH_STM_FRAME:
3959     default:
3960       ASSERT(info->i.type != CATCH_FRAME);
3961       ASSERT(info->i.type != STOP_FRAME);
3962       p = next; 
3963       continue;
3964     }
3965   }
3966 }
3967
3968 /* -----------------------------------------------------------------------------
3969    resurrectThreads is called after garbage collection on the list of
3970    threads found to be garbage.  Each of these threads will be woken
3971    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3972    on an MVar, or NonTermination if the thread was blocked on a Black
3973    Hole.
3974
3975    Locks: sched_mutex isn't held upon entry nor exit.
3976    -------------------------------------------------------------------------- */
3977
3978 void
3979 resurrectThreads( StgTSO *threads )
3980 {
3981   StgTSO *tso, *next;
3982
3983   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3984     next = tso->global_link;
3985     tso->global_link = all_threads;
3986     all_threads = tso;
3987     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3988
3989     switch (tso->why_blocked) {
3990     case BlockedOnMVar:
3991     case BlockedOnException:
3992       /* Called by GC - sched_mutex lock is currently held. */
3993       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3994       break;
3995     case BlockedOnBlackHole:
3996       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3997       break;
3998     case BlockedOnSTM:
3999       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
4000       break;
4001     case NotBlocked:
4002       /* This might happen if the thread was blocked on a black hole
4003        * belonging to a thread that we've just woken up (raiseAsync
4004        * can wake up threads, remember...).
4005        */
4006       continue;
4007     default:
4008       barf("resurrectThreads: thread blocked in a strange way");
4009     }
4010   }
4011 }
4012
4013 /* ----------------------------------------------------------------------------
4014  * Debugging: why is a thread blocked
4015  * [Also provides useful information when debugging threaded programs
4016  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4017    ------------------------------------------------------------------------- */
4018
4019 static void
4020 printThreadBlockage(StgTSO *tso)
4021 {
4022   switch (tso->why_blocked) {
4023   case BlockedOnRead:
4024     debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
4025     break;
4026   case BlockedOnWrite:
4027     debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
4028     break;
4029 #if defined(mingw32_HOST_OS)
4030     case BlockedOnDoProc:
4031     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4032     break;
4033 #endif
4034   case BlockedOnDelay:
4035     debugBelch("is blocked until %ld", tso->block_info.target);
4036     break;
4037   case BlockedOnMVar:
4038     debugBelch("is blocked on an MVar");
4039     break;
4040   case BlockedOnException:
4041     debugBelch("is blocked on delivering an exception to thread %d",
4042             tso->block_info.tso->id);
4043     break;
4044   case BlockedOnBlackHole:
4045     debugBelch("is blocked on a black hole");
4046     break;
4047   case NotBlocked:
4048     debugBelch("is not blocked");
4049     break;
4050 #if defined(PARALLEL_HASKELL)
4051   case BlockedOnGA:
4052     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4053             tso->block_info.closure, info_type(tso->block_info.closure));
4054     break;
4055   case BlockedOnGA_NoSend:
4056     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4057             tso->block_info.closure, info_type(tso->block_info.closure));
4058     break;
4059 #endif
4060   case BlockedOnCCall:
4061     debugBelch("is blocked on an external call");
4062     break;
4063   case BlockedOnCCall_NoUnblockExc:
4064     debugBelch("is blocked on an external call (exceptions were already blocked)");
4065     break;
4066   case BlockedOnSTM:
4067     debugBelch("is blocked on an STM operation");
4068     break;
4069   default:
4070     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4071          tso->why_blocked, tso->id, tso);
4072   }
4073 }
4074
4075 static void
4076 printThreadStatus(StgTSO *tso)
4077 {
4078   switch (tso->what_next) {
4079   case ThreadKilled:
4080     debugBelch("has been killed");
4081     break;
4082   case ThreadComplete:
4083     debugBelch("has completed");
4084     break;
4085   default:
4086     printThreadBlockage(tso);
4087   }
4088 }
4089
4090 void
4091 printAllThreads(void)
4092 {
4093   StgTSO *t;
4094
4095 # if defined(GRAN)
4096   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4097   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4098                        time_string, rtsFalse/*no commas!*/);
4099
4100   debugBelch("all threads at [%s]:\n", time_string);
4101 # elif defined(PARALLEL_HASKELL)
4102   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4103   ullong_format_string(CURRENT_TIME,
4104                        time_string, rtsFalse/*no commas!*/);
4105
4106   debugBelch("all threads at [%s]:\n", time_string);
4107 # else
4108   debugBelch("all threads:\n");
4109 # endif
4110
4111   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
4112     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
4113 #if defined(DEBUG)
4114     {
4115       void *label = lookupThreadLabel(t->id);
4116       if (label) debugBelch("[\"%s\"] ",(char *)label);
4117     }
4118 #endif
4119     printThreadStatus(t);
4120     debugBelch("\n");
4121   }
4122 }
4123     
4124 #ifdef DEBUG
4125
4126 /* 
4127    Print a whole blocking queue attached to node (debugging only).
4128 */
4129 # if defined(PARALLEL_HASKELL)
4130 void 
4131 print_bq (StgClosure *node)
4132 {
4133   StgBlockingQueueElement *bqe;
4134   StgTSO *tso;
4135   rtsBool end;
4136
4137   debugBelch("## BQ of closure %p (%s): ",
4138           node, info_type(node));
4139
4140   /* should cover all closures that may have a blocking queue */
4141   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4142          get_itbl(node)->type == FETCH_ME_BQ ||
4143          get_itbl(node)->type == RBH ||
4144          get_itbl(node)->type == MVAR);
4145     
4146   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4147
4148   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4149 }
4150
4151 /* 
4152    Print a whole blocking queue starting with the element bqe.
4153 */
4154 void 
4155 print_bqe (StgBlockingQueueElement *bqe)
4156 {
4157   rtsBool end;
4158
4159   /* 
4160      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4161   */
4162   for (end = (bqe==END_BQ_QUEUE);
4163        !end; // iterate until bqe points to a CONSTR
4164        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4165        bqe = end ? END_BQ_QUEUE : bqe->link) {
4166     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4167     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4168     /* types of closures that may appear in a blocking queue */
4169     ASSERT(get_itbl(bqe)->type == TSO ||           
4170            get_itbl(bqe)->type == BLOCKED_FETCH || 
4171            get_itbl(bqe)->type == CONSTR); 
4172     /* only BQs of an RBH end with an RBH_Save closure */
4173     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4174
4175     switch (get_itbl(bqe)->type) {
4176     case TSO:
4177       debugBelch(" TSO %u (%x),",
4178               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4179       break;
4180     case BLOCKED_FETCH:
4181       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4182               ((StgBlockedFetch *)bqe)->node, 
4183               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4184               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4185               ((StgBlockedFetch *)bqe)->ga.weight);
4186       break;
4187     case CONSTR:
4188       debugBelch(" %s (IP %p),",
4189               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4190                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4191                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4192                "RBH_Save_?"), get_itbl(bqe));
4193       break;
4194     default:
4195       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4196            info_type((StgClosure *)bqe)); // , node, info_type(node));
4197       break;
4198     }
4199   } /* for */
4200   debugBelch("\n");
4201 }
4202 # elif defined(GRAN)
4203 void 
4204 print_bq (StgClosure *node)
4205 {
4206   StgBlockingQueueElement *bqe;
4207   PEs node_loc, tso_loc;
4208   rtsBool end;
4209
4210   /* should cover all closures that may have a blocking queue */
4211   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4212          get_itbl(node)->type == FETCH_ME_BQ ||
4213          get_itbl(node)->type == RBH);
4214     
4215   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4216   node_loc = where_is(node);
4217
4218   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4219           node, info_type(node), node_loc);
4220
4221   /* 
4222      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4223   */
4224   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4225        !end; // iterate until bqe points to a CONSTR
4226        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4227     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4228     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4229     /* types of closures that may appear in a blocking queue */
4230     ASSERT(get_itbl(bqe)->type == TSO ||           
4231            get_itbl(bqe)->type == CONSTR); 
4232     /* only BQs of an RBH end with an RBH_Save closure */
4233     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4234
4235     tso_loc = where_is((StgClosure *)bqe);
4236     switch (get_itbl(bqe)->type) {
4237     case TSO:
4238       debugBelch(" TSO %d (%p) on [PE %d],",
4239               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4240       break;
4241     case CONSTR:
4242       debugBelch(" %s (IP %p),",
4243               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4244                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4245                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4246                "RBH_Save_?"), get_itbl(bqe));
4247       break;
4248     default:
4249       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4250            info_type((StgClosure *)bqe), node, info_type(node));
4251       break;
4252     }
4253   } /* for */
4254   debugBelch("\n");
4255 }
4256 # endif
4257
4258 #if defined(PARALLEL_HASKELL)
4259 static nat
4260 run_queue_len(void)
4261 {
4262   nat i;
4263   StgTSO *tso;
4264
4265   for (i=0, tso=run_queue_hd; 
4266        tso != END_TSO_QUEUE;
4267        i++, tso=tso->link)
4268     /* nothing */
4269
4270   return i;
4271 }
4272 #endif
4273
4274 void
4275 sched_belch(char *s, ...)
4276 {
4277   va_list ap;
4278   va_start(ap,s);
4279 #ifdef RTS_SUPPORTS_THREADS
4280   debugBelch("sched (task %p): ", osThreadId());
4281 #elif defined(PARALLEL_HASKELL)
4282   debugBelch("== ");
4283 #else
4284   debugBelch("sched: ");
4285 #endif
4286   vdebugBelch(s, ap);
4287   debugBelch("\n");
4288   va_end(ap);
4289 }
4290
4291 #endif /* DEBUG */