977a2aa1e5c5dc740bc9715d0ea8761014f43404
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PARALLEL_HASKELL          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "OSThreads.h"
46 #include "Storage.h"
47 #include "StgRun.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PARALLEL_HASKELL)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 // Turn off inlining when debugging - it obfuscates things
97 #ifdef DEBUG
98 # undef  STATIC_INLINE
99 # define STATIC_INLINE static
100 #endif
101
102 #ifdef THREADED_RTS
103 #define USED_IN_THREADED_RTS
104 #else
105 #define USED_IN_THREADED_RTS STG_UNUSED
106 #endif
107
108 #ifdef RTS_SUPPORTS_THREADS
109 #define USED_WHEN_RTS_SUPPORTS_THREADS
110 #else
111 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
112 #endif
113
114 /* Main thread queue.
115  * Locks required: sched_mutex.
116  */
117 StgMainThread *main_threads = NULL;
118
119 #if defined(GRAN)
120
121 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
122 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
123
124 /* 
125    In GranSim we have a runnable and a blocked queue for each processor.
126    In order to minimise code changes new arrays run_queue_hds/tls
127    are created. run_queue_hd is then a short cut (macro) for
128    run_queue_hds[CurrentProc] (see GranSim.h).
129    -- HWL
130 */
131 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
132 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
133 StgTSO *ccalling_threadss[MAX_PROC];
134 /* We use the same global list of threads (all_threads) in GranSim as in
135    the std RTS (i.e. we are cheating). However, we don't use this list in
136    the GranSim specific code at the moment (so we are only potentially
137    cheating).  */
138
139 #else /* !GRAN */
140
141 /* Thread queues.
142  * Locks required: sched_mutex.
143  */
144 StgTSO *run_queue_hd = NULL;
145 StgTSO *run_queue_tl = NULL;
146 StgTSO *blocked_queue_hd = NULL;
147 StgTSO *blocked_queue_tl = NULL;
148 StgTSO *blackhole_queue = NULL;
149 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
150
151 #endif
152
153 /* The blackhole_queue should be checked for threads to wake up.  See
154  * Schedule.h for more thorough comment.
155  */
156 rtsBool blackholes_need_checking = rtsFalse;
157
158 /* Linked list of all threads.
159  * Used for detecting garbage collected threads.
160  */
161 StgTSO *all_threads = NULL;
162
163 /* When a thread performs a safe C call (_ccall_GC, using old
164  * terminology), it gets put on the suspended_ccalling_threads
165  * list. Used by the garbage collector.
166  */
167 static StgTSO *suspended_ccalling_threads;
168
169 /* KH: The following two flags are shared memory locations.  There is no need
170        to lock them, since they are only unset at the end of a scheduler
171        operation.
172 */
173
174 /* flag set by signal handler to precipitate a context switch */
175 int context_switch = 0;
176
177 /* flag that tracks whether we have done any execution in this time slice. */
178 nat recent_activity = ACTIVITY_YES;
179
180 /* if this flag is set as well, give up execution */
181 rtsBool interrupted = rtsFalse;
182
183 /* Next thread ID to allocate.
184  * Locks required: thread_id_mutex
185  */
186 static StgThreadID next_thread_id = 1;
187
188 /*
189  * Pointers to the state of the current thread.
190  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
191  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
192  */
193  
194 /* The smallest stack size that makes any sense is:
195  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
196  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
197  *  + 1                       (the closure to enter)
198  *  + 1                       (stg_ap_v_ret)
199  *  + 1                       (spare slot req'd by stg_ap_v_ret)
200  *
201  * A thread with this stack will bomb immediately with a stack
202  * overflow, which will increase its stack size.  
203  */
204
205 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
206
207
208 #if defined(GRAN)
209 StgTSO *CurrentTSO;
210 #endif
211
212 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
213  *  exists - earlier gccs apparently didn't.
214  *  -= chak
215  */
216 StgTSO dummy_tso;
217
218 /*
219  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
220  * in an MT setting, needed to signal that a worker thread shouldn't hang around
221  * in the scheduler when it is out of work.
222  */
223 static rtsBool shutting_down_scheduler = rtsFalse;
224
225 #if defined(RTS_SUPPORTS_THREADS)
226 /* ToDo: carefully document the invariants that go together
227  *       with these synchronisation objects.
228  */
229 Mutex     sched_mutex       = INIT_MUTEX_VAR;
230 Mutex     term_mutex        = INIT_MUTEX_VAR;
231
232 #endif /* RTS_SUPPORTS_THREADS */
233
234 #if defined(PARALLEL_HASKELL)
235 StgTSO *LastTSO;
236 rtsTime TimeOfLastYield;
237 rtsBool emitSchedule = rtsTrue;
238 #endif
239
240 #if DEBUG
241 static char *whatNext_strs[] = {
242   "(unknown)",
243   "ThreadRunGHC",
244   "ThreadInterpret",
245   "ThreadKilled",
246   "ThreadRelocated",
247   "ThreadComplete"
248 };
249 #endif
250
251 /* -----------------------------------------------------------------------------
252  * static function prototypes
253  * -------------------------------------------------------------------------- */
254
255 #if defined(RTS_SUPPORTS_THREADS)
256 static void taskStart(void);
257 #endif
258
259 static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
260                       Capability *initialCapability );
261
262 //
263 // These function all encapsulate parts of the scheduler loop, and are
264 // abstracted only to make the structure and control flow of the
265 // scheduler clearer.
266 //
267 static void schedulePreLoop(void);
268 static void scheduleStartSignalHandlers(void);
269 static void scheduleCheckBlockedThreads(void);
270 static void scheduleCheckBlackHoles(void);
271 static void scheduleDetectDeadlock(void);
272 #if defined(GRAN)
273 static StgTSO *scheduleProcessEvent(rtsEvent *event);
274 #endif
275 #if defined(PARALLEL_HASKELL)
276 static StgTSO *scheduleSendPendingMessages(void);
277 static void scheduleActivateSpark(void);
278 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
279 #endif
280 #if defined(PAR) || defined(GRAN)
281 static void scheduleGranParReport(void);
282 #endif
283 static void schedulePostRunThread(void);
284 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
285 static void scheduleHandleStackOverflow( StgTSO *t);
286 static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
287 static void scheduleHandleThreadBlocked( StgTSO *t );
288 static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, 
289                                              Capability *cap, StgTSO *t );
290 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
291 static void scheduleDoGC(Capability *cap);
292
293 static void unblockThread(StgTSO *tso);
294 static rtsBool checkBlackHoles(void);
295 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
296                                    Capability *initialCapability
297                                    );
298 static void scheduleThread_ (StgTSO* tso);
299 static void AllRoots(evac_fn evac);
300
301 static StgTSO *threadStackOverflow(StgTSO *tso);
302
303 static void raiseAsync_(StgTSO *tso, StgClosure *exception, 
304                         rtsBool stop_at_atomically);
305
306 static void printThreadBlockage(StgTSO *tso);
307 static void printThreadStatus(StgTSO *tso);
308
309 #if defined(PARALLEL_HASKELL)
310 StgTSO * createSparkThread(rtsSpark spark);
311 StgTSO * activateSpark (rtsSpark spark);  
312 #endif
313
314 /* ----------------------------------------------------------------------------
315  * Starting Tasks
316  * ------------------------------------------------------------------------- */
317
318 #if defined(RTS_SUPPORTS_THREADS)
319 static nat startingWorkerThread = 0;
320
321 static void
322 taskStart(void)
323 {
324   ACQUIRE_LOCK(&sched_mutex);
325   startingWorkerThread--;
326   schedule(NULL,NULL);
327   taskStop();
328   RELEASE_LOCK(&sched_mutex);
329 }
330
331 void
332 startSchedulerTaskIfNecessary(void)
333 {
334     if ( !EMPTY_RUN_QUEUE()
335          && !shutting_down_scheduler // not if we're shutting down
336          && startingWorkerThread==0)
337     {
338         // we don't want to start another worker thread
339         // just because the last one hasn't yet reached the
340         // "waiting for capability" state
341         startingWorkerThread++;
342         if (!maybeStartNewWorker(taskStart)) {
343             startingWorkerThread--;
344         }
345     }
346 }
347 #endif
348
349 /* -----------------------------------------------------------------------------
350  * Putting a thread on the run queue: different scheduling policies
351  * -------------------------------------------------------------------------- */
352
353 STATIC_INLINE void
354 addToRunQueue( StgTSO *t )
355 {
356 #if defined(PARALLEL_HASKELL)
357     if (RtsFlags.ParFlags.doFairScheduling) { 
358         // this does round-robin scheduling; good for concurrency
359         APPEND_TO_RUN_QUEUE(t);
360     } else {
361         // this does unfair scheduling; good for parallelism
362         PUSH_ON_RUN_QUEUE(t);
363     }
364 #else
365     // this does round-robin scheduling; good for concurrency
366     APPEND_TO_RUN_QUEUE(t);
367 #endif
368 }
369     
370 /* ---------------------------------------------------------------------------
371    Main scheduling loop.
372
373    We use round-robin scheduling, each thread returning to the
374    scheduler loop when one of these conditions is detected:
375
376       * out of heap space
377       * timer expires (thread yields)
378       * thread blocks
379       * thread ends
380       * stack overflow
381
382    Locking notes:  we acquire the scheduler lock once at the beginning
383    of the scheduler loop, and release it when
384     
385       * running a thread, or
386       * waiting for work, or
387       * waiting for a GC to complete.
388
389    GRAN version:
390      In a GranSim setup this loop iterates over the global event queue.
391      This revolves around the global event queue, which determines what 
392      to do next. Therefore, it's more complicated than either the 
393      concurrent or the parallel (GUM) setup.
394
395    GUM version:
396      GUM iterates over incoming messages.
397      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
398      and sends out a fish whenever it has nothing to do; in-between
399      doing the actual reductions (shared code below) it processes the
400      incoming messages and deals with delayed operations 
401      (see PendingFetches).
402      This is not the ugliest code you could imagine, but it's bloody close.
403
404    ------------------------------------------------------------------------ */
405
406 static void
407 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
408           Capability *initialCapability )
409 {
410   StgTSO *t;
411   Capability *cap;
412   StgThreadReturnCode ret;
413 #if defined(GRAN)
414   rtsEvent *event;
415 #elif defined(PARALLEL_HASKELL)
416   StgTSO *tso;
417   GlobalTaskId pe;
418   rtsBool receivedFinish = rtsFalse;
419 # if defined(DEBUG)
420   nat tp_size, sp_size; // stats only
421 # endif
422 #endif
423   nat prev_what_next;
424   rtsBool ready_to_gc;
425   
426   // Pre-condition: sched_mutex is held.
427   // We might have a capability, passed in as initialCapability.
428   cap = initialCapability;
429
430 #if !defined(RTS_SUPPORTS_THREADS)
431   // simply initialise it in the non-threaded case
432   grabCapability(&cap);
433 #endif
434
435   IF_DEBUG(scheduler,
436            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
437                        mainThread, initialCapability);
438       );
439
440   schedulePreLoop();
441
442   // -----------------------------------------------------------
443   // Scheduler loop starts here:
444
445 #if defined(PARALLEL_HASKELL)
446 #define TERMINATION_CONDITION        (!receivedFinish)
447 #elif defined(GRAN)
448 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
449 #else
450 #define TERMINATION_CONDITION        rtsTrue
451 #endif
452
453   while (TERMINATION_CONDITION) {
454
455 #if defined(GRAN)
456       /* Choose the processor with the next event */
457       CurrentProc = event->proc;
458       CurrentTSO = event->tso;
459 #endif
460
461       IF_DEBUG(scheduler, printAllThreads());
462
463 #if defined(RTS_SUPPORTS_THREADS)
464       // Yield the capability to higher-priority tasks if necessary.
465       //
466       if (cap != NULL) {
467           yieldCapability(&cap);
468       }
469
470       // If we do not currently hold a capability, we wait for one
471       //
472       if (cap == NULL) {
473           waitForCapability(&sched_mutex, &cap,
474                             mainThread ? &mainThread->bound_thread_cond : NULL);
475       }
476
477       // We now have a capability...
478 #endif
479       
480 #if 0 /* extra sanity checking */
481       { 
482           StgMainThread *m;
483           for (m = main_threads; m != NULL; m = m->link) {
484               ASSERT(get_itbl(m->tso)->type == TSO);
485           }
486       }
487 #endif
488
489     // Check whether we have re-entered the RTS from Haskell without
490     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
491     // call).
492     if (cap->r.rInHaskell) {
493           errorBelch("schedule: re-entered unsafely.\n"
494                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
495           stg_exit(1);
496     }
497
498     //
499     // Test for interruption.  If interrupted==rtsTrue, then either
500     // we received a keyboard interrupt (^C), or the scheduler is
501     // trying to shut down all the tasks (shutting_down_scheduler) in
502     // the threaded RTS.
503     //
504     if (interrupted) {
505         if (shutting_down_scheduler) {
506             IF_DEBUG(scheduler, sched_belch("shutting down"));
507             releaseCapability(cap);
508             if (mainThread) {
509                 mainThread->stat = Interrupted;
510                 mainThread->ret  = NULL;
511             }
512             return;
513         } else {
514             IF_DEBUG(scheduler, sched_belch("interrupted"));
515             deleteAllThreads();
516         }
517     }
518
519 #if defined(not_yet) && defined(SMP)
520     //
521     // Top up the run queue from our spark pool.  We try to make the
522     // number of threads in the run queue equal to the number of
523     // free capabilities.
524     //
525     {
526         StgClosure *spark;
527         if (EMPTY_RUN_QUEUE()) {
528             spark = findSpark(rtsFalse);
529             if (spark == NULL) {
530                 break; /* no more sparks in the pool */
531             } else {
532                 createSparkThread(spark);         
533                 IF_DEBUG(scheduler,
534                          sched_belch("==^^ turning spark of closure %p into a thread",
535                                      (StgClosure *)spark));
536             }
537         }
538     }
539 #endif // SMP
540
541     scheduleStartSignalHandlers();
542
543     // Only check the black holes here if we've nothing else to do.
544     // During normal execution, the black hole list only gets checked
545     // at GC time, to avoid repeatedly traversing this possibly long
546     // list each time around the scheduler.
547     if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); }
548
549     scheduleCheckBlockedThreads();
550
551     scheduleDetectDeadlock();
552
553     // Normally, the only way we can get here with no threads to
554     // run is if a keyboard interrupt received during 
555     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
556     // Additionally, it is not fatal for the
557     // threaded RTS to reach here with no threads to run.
558     //
559     // win32: might be here due to awaitEvent() being abandoned
560     // as a result of a console event having been delivered.
561     if ( EMPTY_RUN_QUEUE() ) {
562 #if !defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_HOST_OS)
563         ASSERT(interrupted);
564 #endif
565         continue; // nothing to do
566     }
567
568 #if defined(PARALLEL_HASKELL)
569     scheduleSendPendingMessages();
570     if (EMPTY_RUN_QUEUE() && scheduleActivateSpark()) 
571         continue;
572
573 #if defined(SPARKS)
574     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
575 #endif
576
577     /* If we still have no work we need to send a FISH to get a spark
578        from another PE */
579     if (EMPTY_RUN_QUEUE()) {
580         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
581         ASSERT(rtsFalse); // should not happen at the moment
582     }
583     // from here: non-empty run queue.
584     //  TODO: merge above case with this, only one call processMessages() !
585     if (PacketsWaiting()) {  /* process incoming messages, if
586                                 any pending...  only in else
587                                 because getRemoteWork waits for
588                                 messages as well */
589         receivedFinish = processMessages();
590     }
591 #endif
592
593 #if defined(GRAN)
594     scheduleProcessEvent(event);
595 #endif
596
597     // 
598     // Get a thread to run
599     //
600     ASSERT(run_queue_hd != END_TSO_QUEUE);
601     POP_RUN_QUEUE(t);
602
603 #if defined(GRAN) || defined(PAR)
604     scheduleGranParReport(); // some kind of debuging output
605 #else
606     // Sanity check the thread we're about to run.  This can be
607     // expensive if there is lots of thread switching going on...
608     IF_DEBUG(sanity,checkTSO(t));
609 #endif
610
611 #if defined(RTS_SUPPORTS_THREADS)
612     // Check whether we can run this thread in the current task.
613     // If not, we have to pass our capability to the right task.
614     {
615       StgMainThread *m = t->main;
616       
617       if(m)
618       {
619         if(m == mainThread)
620         {
621           IF_DEBUG(scheduler,
622             sched_belch("### Running thread %d in bound thread", t->id));
623           // yes, the Haskell thread is bound to the current native thread
624         }
625         else
626         {
627           IF_DEBUG(scheduler,
628             sched_belch("### thread %d bound to another OS thread", t->id));
629           // no, bound to a different Haskell thread: pass to that thread
630           PUSH_ON_RUN_QUEUE(t);
631           passCapability(&m->bound_thread_cond);
632           continue;
633         }
634       }
635       else
636       {
637         if(mainThread != NULL)
638         // The thread we want to run is bound.
639         {
640           IF_DEBUG(scheduler,
641             sched_belch("### this OS thread cannot run thread %d", t->id));
642           // no, the current native thread is bound to a different
643           // Haskell thread, so pass it to any worker thread
644           PUSH_ON_RUN_QUEUE(t);
645           passCapabilityToWorker();
646           continue; 
647         }
648       }
649     }
650 #endif
651
652     cap->r.rCurrentTSO = t;
653     
654     /* context switches are now initiated by the timer signal, unless
655      * the user specified "context switch as often as possible", with
656      * +RTS -C0
657      */
658     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
659          && (run_queue_hd != END_TSO_QUEUE
660              || blocked_queue_hd != END_TSO_QUEUE
661              || sleeping_queue != END_TSO_QUEUE)))
662         context_switch = 1;
663
664 run_thread:
665
666     RELEASE_LOCK(&sched_mutex);
667
668     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
669                               (long)t->id, whatNext_strs[t->what_next]));
670
671 #if defined(PROFILING)
672     startHeapProfTimer();
673 #endif
674
675     // ----------------------------------------------------------------------
676     // Run the current thread 
677
678     prev_what_next = t->what_next;
679
680     errno = t->saved_errno;
681     cap->r.rInHaskell = rtsTrue;
682
683     recent_activity = ACTIVITY_YES;
684
685     switch (prev_what_next) {
686
687     case ThreadKilled:
688     case ThreadComplete:
689         /* Thread already finished, return to scheduler. */
690         ret = ThreadFinished;
691         break;
692
693     case ThreadRunGHC:
694         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
695         break;
696
697     case ThreadInterpret:
698         ret = interpretBCO(cap);
699         break;
700
701     default:
702       barf("schedule: invalid what_next field");
703     }
704
705 #if defined(SMP)
706     // in SMP mode, we might return with a different capability than
707     // we started with, if the Haskell thread made a foreign call.  So
708     // let's find out what our current Capability is:
709     cap = myCapability();
710 #endif
711
712     // We have run some Haskell code: there might be blackhole-blocked
713     // threads to wake up now.
714     if ( blackhole_queue != END_TSO_QUEUE ) {
715         blackholes_need_checking = rtsTrue;
716     }
717
718     cap->r.rInHaskell = rtsFalse;
719
720     // The TSO might have moved, eg. if it re-entered the RTS and a GC
721     // happened.  So find the new location:
722     t = cap->r.rCurrentTSO;
723
724     // And save the current errno in this thread.
725     t->saved_errno = errno;
726
727     // ----------------------------------------------------------------------
728     
729     /* Costs for the scheduler are assigned to CCS_SYSTEM */
730 #if defined(PROFILING)
731     stopHeapProfTimer();
732     CCCS = CCS_SYSTEM;
733 #endif
734     
735     ACQUIRE_LOCK(&sched_mutex);
736     
737 #if defined(RTS_SUPPORTS_THREADS)
738     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
739 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
740     IF_DEBUG(scheduler,debugBelch("sched: "););
741 #endif
742     
743     schedulePostRunThread();
744
745     ready_to_gc = rtsFalse;
746
747     switch (ret) {
748     case HeapOverflow:
749         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
750         break;
751
752     case StackOverflow:
753         scheduleHandleStackOverflow(t);
754         break;
755
756     case ThreadYielding:
757         if (scheduleHandleYield(t, prev_what_next)) {
758             // shortcut for switching between compiler/interpreter:
759             goto run_thread; 
760         }
761         break;
762
763     case ThreadBlocked:
764         scheduleHandleThreadBlocked(t);
765         threadPaused(t);
766         break;
767
768     case ThreadFinished:
769         if (scheduleHandleThreadFinished(mainThread, cap, t)) return;;
770         break;
771
772     default:
773       barf("schedule: invalid thread return code %d", (int)ret);
774     }
775
776     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
777     if (ready_to_gc) { scheduleDoGC(cap); }
778   } /* end of while() */
779
780   IF_PAR_DEBUG(verbose,
781                debugBelch("== Leaving schedule() after having received Finish\n"));
782 }
783
784 /* ----------------------------------------------------------------------------
785  * Setting up the scheduler loop
786  * ASSUMES: sched_mutex
787  * ------------------------------------------------------------------------- */
788
789 static void
790 schedulePreLoop(void)
791 {
792 #if defined(GRAN) 
793     /* set up first event to get things going */
794     /* ToDo: assign costs for system setup and init MainTSO ! */
795     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
796               ContinueThread, 
797               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
798     
799     IF_DEBUG(gran,
800              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
801                         CurrentTSO);
802              G_TSO(CurrentTSO, 5));
803     
804     if (RtsFlags.GranFlags.Light) {
805         /* Save current time; GranSim Light only */
806         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
807     }      
808 #endif
809 }
810
811 /* ----------------------------------------------------------------------------
812  * Start any pending signal handlers
813  * ASSUMES: sched_mutex
814  * ------------------------------------------------------------------------- */
815
816 static void
817 scheduleStartSignalHandlers(void)
818 {
819 #if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
820     if (signals_pending()) {
821       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
822       startSignalHandlers();
823       ACQUIRE_LOCK(&sched_mutex);
824     }
825 #endif
826 }
827
828 /* ----------------------------------------------------------------------------
829  * Check for blocked threads that can be woken up.
830  * ASSUMES: sched_mutex
831  * ------------------------------------------------------------------------- */
832
833 static void
834 scheduleCheckBlockedThreads(void)
835 {
836     //
837     // Check whether any waiting threads need to be woken up.  If the
838     // run queue is empty, and there are no other tasks running, we
839     // can wait indefinitely for something to happen.
840     //
841     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
842     {
843 #if defined(RTS_SUPPORTS_THREADS)
844         // We shouldn't be here...
845         barf("schedule: awaitEvent() in threaded RTS");
846 #endif
847         awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
848     }
849 }
850
851
852 /* ----------------------------------------------------------------------------
853  * Check for threads blocked on BLACKHOLEs that can be woken up
854  * ASSUMES: sched_mutex
855  * ------------------------------------------------------------------------- */
856 static void
857 scheduleCheckBlackHoles( void )
858 {
859     if ( blackholes_need_checking )
860     {
861         checkBlackHoles();
862         blackholes_need_checking = rtsFalse;
863     }
864 }
865
866 /* ----------------------------------------------------------------------------
867  * Detect deadlock conditions and attempt to resolve them.
868  * ASSUMES: sched_mutex
869  * ------------------------------------------------------------------------- */
870
871 static void
872 scheduleDetectDeadlock(void)
873 {
874
875 #if defined(PARALLEL_HASKELL)
876     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
877     return;
878 #endif
879
880     /* 
881      * Detect deadlock: when we have no threads to run, there are no
882      * threads blocked, waiting for I/O, or sleeping, and all the
883      * other tasks are waiting for work, we must have a deadlock of
884      * some description.
885      */
886     if ( EMPTY_THREAD_QUEUES() )
887     {
888 #if defined(RTS_SUPPORTS_THREADS)
889         /* 
890          * In the threaded RTS, we only check for deadlock if there
891          * has been no activity in a complete timeslice.  This means
892          * we won't eagerly start a full GC just because we don't have
893          * any threads to run currently.
894          */
895         if (recent_activity != ACTIVITY_INACTIVE) return;
896 #endif
897
898         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
899
900         // Garbage collection can release some new threads due to
901         // either (a) finalizers or (b) threads resurrected because
902         // they are unreachable and will therefore be sent an
903         // exception.  Any threads thus released will be immediately
904         // runnable.
905         GarbageCollect(GetRoots,rtsTrue);
906         recent_activity = ACTIVITY_DONE_GC;
907         if ( !EMPTY_RUN_QUEUE() ) return;
908
909 #if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
910         /* If we have user-installed signal handlers, then wait
911          * for signals to arrive rather then bombing out with a
912          * deadlock.
913          */
914         if ( anyUserHandlers() ) {
915             IF_DEBUG(scheduler, 
916                      sched_belch("still deadlocked, waiting for signals..."));
917
918             awaitUserSignals();
919
920             if (signals_pending()) {
921                 RELEASE_LOCK(&sched_mutex);
922                 startSignalHandlers();
923                 ACQUIRE_LOCK(&sched_mutex);
924             }
925
926             // either we have threads to run, or we were interrupted:
927             ASSERT(!EMPTY_RUN_QUEUE() || interrupted);
928         }
929 #endif
930
931 #if !defined(RTS_SUPPORTS_THREADS)
932         /* Probably a real deadlock.  Send the current main thread the
933          * Deadlock exception (or in the SMP build, send *all* main
934          * threads the deadlock exception, since none of them can make
935          * progress).
936          */
937         {
938             StgMainThread *m;
939             m = main_threads;
940             switch (m->tso->why_blocked) {
941             case BlockedOnSTM:
942             case BlockedOnBlackHole:
943             case BlockedOnException:
944             case BlockedOnMVar:
945                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
946                 return;
947             default:
948                 barf("deadlock: main thread blocked in a strange way");
949             }
950         }
951 #endif
952     }
953 }
954
955 /* ----------------------------------------------------------------------------
956  * Process an event (GRAN only)
957  * ------------------------------------------------------------------------- */
958
959 #if defined(GRAN)
960 static StgTSO *
961 scheduleProcessEvent(rtsEvent *event)
962 {
963     StgTSO *t;
964
965     if (RtsFlags.GranFlags.Light)
966       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
967
968     /* adjust time based on time-stamp */
969     if (event->time > CurrentTime[CurrentProc] &&
970         event->evttype != ContinueThread)
971       CurrentTime[CurrentProc] = event->time;
972     
973     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
974     if (!RtsFlags.GranFlags.Light)
975       handleIdlePEs();
976
977     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
978
979     /* main event dispatcher in GranSim */
980     switch (event->evttype) {
981       /* Should just be continuing execution */
982     case ContinueThread:
983       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
984       /* ToDo: check assertion
985       ASSERT(run_queue_hd != (StgTSO*)NULL &&
986              run_queue_hd != END_TSO_QUEUE);
987       */
988       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
989       if (!RtsFlags.GranFlags.DoAsyncFetch &&
990           procStatus[CurrentProc]==Fetching) {
991         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
992               CurrentTSO->id, CurrentTSO, CurrentProc);
993         goto next_thread;
994       } 
995       /* Ignore ContinueThreads for completed threads */
996       if (CurrentTSO->what_next == ThreadComplete) {
997         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
998               CurrentTSO->id, CurrentTSO, CurrentProc);
999         goto next_thread;
1000       } 
1001       /* Ignore ContinueThreads for threads that are being migrated */
1002       if (PROCS(CurrentTSO)==Nowhere) { 
1003         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1004               CurrentTSO->id, CurrentTSO, CurrentProc);
1005         goto next_thread;
1006       }
1007       /* The thread should be at the beginning of the run queue */
1008       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1009         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1010               CurrentTSO->id, CurrentTSO, CurrentProc);
1011         break; // run the thread anyway
1012       }
1013       /*
1014       new_event(proc, proc, CurrentTime[proc],
1015                 FindWork,
1016                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1017       goto next_thread; 
1018       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1019       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1020
1021     case FetchNode:
1022       do_the_fetchnode(event);
1023       goto next_thread;             /* handle next event in event queue  */
1024       
1025     case GlobalBlock:
1026       do_the_globalblock(event);
1027       goto next_thread;             /* handle next event in event queue  */
1028       
1029     case FetchReply:
1030       do_the_fetchreply(event);
1031       goto next_thread;             /* handle next event in event queue  */
1032       
1033     case UnblockThread:   /* Move from the blocked queue to the tail of */
1034       do_the_unblock(event);
1035       goto next_thread;             /* handle next event in event queue  */
1036       
1037     case ResumeThread:  /* Move from the blocked queue to the tail of */
1038       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1039       event->tso->gran.blocktime += 
1040         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1041       do_the_startthread(event);
1042       goto next_thread;             /* handle next event in event queue  */
1043       
1044     case StartThread:
1045       do_the_startthread(event);
1046       goto next_thread;             /* handle next event in event queue  */
1047       
1048     case MoveThread:
1049       do_the_movethread(event);
1050       goto next_thread;             /* handle next event in event queue  */
1051       
1052     case MoveSpark:
1053       do_the_movespark(event);
1054       goto next_thread;             /* handle next event in event queue  */
1055       
1056     case FindWork:
1057       do_the_findwork(event);
1058       goto next_thread;             /* handle next event in event queue  */
1059       
1060     default:
1061       barf("Illegal event type %u\n", event->evttype);
1062     }  /* switch */
1063     
1064     /* This point was scheduler_loop in the old RTS */
1065
1066     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1067
1068     TimeOfLastEvent = CurrentTime[CurrentProc];
1069     TimeOfNextEvent = get_time_of_next_event();
1070     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1071     // CurrentTSO = ThreadQueueHd;
1072
1073     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1074                          TimeOfNextEvent));
1075
1076     if (RtsFlags.GranFlags.Light) 
1077       GranSimLight_leave_system(event, &ActiveTSO); 
1078
1079     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1080
1081     IF_DEBUG(gran, 
1082              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1083
1084     /* in a GranSim setup the TSO stays on the run queue */
1085     t = CurrentTSO;
1086     /* Take a thread from the run queue. */
1087     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1088
1089     IF_DEBUG(gran, 
1090              debugBelch("GRAN: About to run current thread, which is\n");
1091              G_TSO(t,5));
1092
1093     context_switch = 0; // turned on via GranYield, checking events and time slice
1094
1095     IF_DEBUG(gran, 
1096              DumpGranEvent(GR_SCHEDULE, t));
1097
1098     procStatus[CurrentProc] = Busy;
1099 }
1100 #endif // GRAN
1101
1102 /* ----------------------------------------------------------------------------
1103  * Send pending messages (PARALLEL_HASKELL only)
1104  * ------------------------------------------------------------------------- */
1105
1106 #if defined(PARALLEL_HASKELL)
1107 static StgTSO *
1108 scheduleSendPendingMessages(void)
1109 {
1110     StgSparkPool *pool;
1111     rtsSpark spark;
1112     StgTSO *t;
1113
1114 # if defined(PAR) // global Mem.Mgmt., omit for now
1115     if (PendingFetches != END_BF_QUEUE) {
1116         processFetches();
1117     }
1118 # endif
1119     
1120     if (RtsFlags.ParFlags.BufferTime) {
1121         // if we use message buffering, we must send away all message
1122         // packets which have become too old...
1123         sendOldBuffers(); 
1124     }
1125 }
1126 #endif
1127
1128 /* ----------------------------------------------------------------------------
1129  * Activate spark threads (PARALLEL_HASKELL only)
1130  * ------------------------------------------------------------------------- */
1131
1132 #if defined(PARALLEL_HASKELL)
1133 static void
1134 scheduleActivateSpark(void)
1135 {
1136 #if defined(SPARKS)
1137   ASSERT(EMPTY_RUN_QUEUE());
1138 /* We get here if the run queue is empty and want some work.
1139    We try to turn a spark into a thread, and add it to the run queue,
1140    from where it will be picked up in the next iteration of the scheduler
1141    loop.
1142 */
1143
1144       /* :-[  no local threads => look out for local sparks */
1145       /* the spark pool for the current PE */
1146       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1147       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1148           pool->hd < pool->tl) {
1149         /* 
1150          * ToDo: add GC code check that we really have enough heap afterwards!!
1151          * Old comment:
1152          * If we're here (no runnable threads) and we have pending
1153          * sparks, we must have a space problem.  Get enough space
1154          * to turn one of those pending sparks into a
1155          * thread... 
1156          */
1157
1158         spark = findSpark(rtsFalse);            /* get a spark */
1159         if (spark != (rtsSpark) NULL) {
1160           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1161           IF_PAR_DEBUG(fish, // schedule,
1162                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1163                              tso->id, tso, advisory_thread_count));
1164
1165           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1166             IF_PAR_DEBUG(fish, // schedule,
1167                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1168                             spark));
1169             return rtsFalse; /* failed to generate a thread */
1170           }                  /* otherwise fall through & pick-up new tso */
1171         } else {
1172           IF_PAR_DEBUG(fish, // schedule,
1173                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1174                              spark_queue_len(pool)));
1175           return rtsFalse;  /* failed to generate a thread */
1176         }
1177         return rtsTrue;  /* success in generating a thread */
1178   } else { /* no more threads permitted or pool empty */
1179     return rtsFalse;  /* failed to generateThread */
1180   }
1181 #else
1182   tso = NULL; // avoid compiler warning only
1183   return rtsFalse;  /* dummy in non-PAR setup */
1184 #endif // SPARKS
1185 }
1186 #endif // PARALLEL_HASKELL
1187
1188 /* ----------------------------------------------------------------------------
1189  * Get work from a remote node (PARALLEL_HASKELL only)
1190  * ------------------------------------------------------------------------- */
1191     
1192 #if defined(PARALLEL_HASKELL)
1193 static rtsBool
1194 scheduleGetRemoteWork(rtsBool *receivedFinish)
1195 {
1196   ASSERT(EMPTY_RUN_QUEUE());
1197
1198   if (RtsFlags.ParFlags.BufferTime) {
1199         IF_PAR_DEBUG(verbose, 
1200                 debugBelch("...send all pending data,"));
1201         {
1202           nat i;
1203           for (i=1; i<=nPEs; i++)
1204             sendImmediately(i); // send all messages away immediately
1205         }
1206   }
1207 # ifndef SPARKS
1208         //++EDEN++ idle() , i.e. send all buffers, wait for work
1209         // suppress fishing in EDEN... just look for incoming messages
1210         // (blocking receive)
1211   IF_PAR_DEBUG(verbose, 
1212                debugBelch("...wait for incoming messages...\n"));
1213   *receivedFinish = processMessages(); // blocking receive...
1214
1215         // and reenter scheduling loop after having received something
1216         // (return rtsFalse below)
1217
1218 # else /* activate SPARKS machinery */
1219 /* We get here, if we have no work, tried to activate a local spark, but still
1220    have no work. We try to get a remote spark, by sending a FISH message.
1221    Thread migration should be added here, and triggered when a sequence of 
1222    fishes returns without work. */
1223         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1224
1225       /* =8-[  no local sparks => look for work on other PEs */
1226         /*
1227          * We really have absolutely no work.  Send out a fish
1228          * (there may be some out there already), and wait for
1229          * something to arrive.  We clearly can't run any threads
1230          * until a SCHEDULE or RESUME arrives, and so that's what
1231          * we're hoping to see.  (Of course, we still have to
1232          * respond to other types of messages.)
1233          */
1234         rtsTime now = msTime() /*CURRENT_TIME*/;
1235         IF_PAR_DEBUG(verbose, 
1236                      debugBelch("--  now=%ld\n", now));
1237         IF_PAR_DEBUG(fish, // verbose,
1238              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1239                  (last_fish_arrived_at!=0 &&
1240                   last_fish_arrived_at+delay > now)) {
1241                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1242                      now, last_fish_arrived_at+delay, 
1243                      last_fish_arrived_at,
1244                      delay);
1245              });
1246   
1247         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1248             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1249           if (last_fish_arrived_at==0 ||
1250               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1251             /* outstandingFishes is set in sendFish, processFish;
1252                avoid flooding system with fishes via delay */
1253     next_fish_to_send_at = 0;  
1254   } else {
1255     /* ToDo: this should be done in the main scheduling loop to avoid the
1256              busy wait here; not so bad if fish delay is very small  */
1257     int iq = 0; // DEBUGGING -- HWL
1258     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1259     /* send a fish when ready, but process messages that arrive in the meantime */
1260     do {
1261       if (PacketsWaiting()) {
1262         iq++; // DEBUGGING
1263         *receivedFinish = processMessages();
1264       }
1265       now = msTime();
1266     } while (!*receivedFinish || now<next_fish_to_send_at);
1267     // JB: This means the fish could become obsolete, if we receive
1268     // work. Better check for work again? 
1269     // last line: while (!receivedFinish || !haveWork || now<...)
1270     // next line: if (receivedFinish || haveWork )
1271
1272     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1273       return rtsFalse;  // NB: this will leave scheduler loop
1274                         // immediately after return!
1275                           
1276     IF_PAR_DEBUG(fish, // verbose,
1277                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1278
1279   }
1280
1281     // JB: IMHO, this should all be hidden inside sendFish(...)
1282     /* pe = choosePE(); 
1283        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1284                 NEW_FISH_HUNGER);
1285
1286     // Global statistics: count no. of fishes
1287     if (RtsFlags.ParFlags.ParStats.Global &&
1288          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1289            globalParStats.tot_fish_mess++;
1290            }
1291     */ 
1292
1293   /* delayed fishes must have been sent by now! */
1294   next_fish_to_send_at = 0;  
1295   }
1296       
1297   *receivedFinish = processMessages();
1298 # endif /* SPARKS */
1299
1300  return rtsFalse;
1301  /* NB: this function always returns rtsFalse, meaning the scheduler
1302     loop continues with the next iteration; 
1303     rationale: 
1304       return code means success in finding work; we enter this function
1305       if there is no local work, thus have to send a fish which takes
1306       time until it arrives with work; in the meantime we should process
1307       messages in the main loop;
1308  */
1309 }
1310 #endif // PARALLEL_HASKELL
1311
1312 /* ----------------------------------------------------------------------------
1313  * PAR/GRAN: Report stats & debugging info(?)
1314  * ------------------------------------------------------------------------- */
1315
1316 #if defined(PAR) || defined(GRAN)
1317 static void
1318 scheduleGranParReport(void)
1319 {
1320   ASSERT(run_queue_hd != END_TSO_QUEUE);
1321
1322   /* Take a thread from the run queue, if we have work */
1323   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1324
1325     /* If this TSO has got its outport closed in the meantime, 
1326      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1327      * It has to be marked as TH_DEAD for this purpose.
1328      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1329
1330 JB: TODO: investigate wether state change field could be nuked
1331      entirely and replaced by the normal tso state (whatnext
1332      field). All we want to do is to kill tsos from outside.
1333      */
1334
1335     /* ToDo: write something to the log-file
1336     if (RTSflags.ParFlags.granSimStats && !sameThread)
1337         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1338
1339     CurrentTSO = t;
1340     */
1341     /* the spark pool for the current PE */
1342     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1343
1344     IF_DEBUG(scheduler, 
1345              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1346                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1347
1348     IF_PAR_DEBUG(fish,
1349              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1350                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1351
1352     if (RtsFlags.ParFlags.ParStats.Full && 
1353         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1354         (emitSchedule || // forced emit
1355          (t && LastTSO && t->id != LastTSO->id))) {
1356       /* 
1357          we are running a different TSO, so write a schedule event to log file
1358          NB: If we use fair scheduling we also have to write  a deschedule 
1359              event for LastTSO; with unfair scheduling we know that the
1360              previous tso has blocked whenever we switch to another tso, so
1361              we don't need it in GUM for now
1362       */
1363       IF_PAR_DEBUG(fish, // schedule,
1364                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1365
1366       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1367                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1368       emitSchedule = rtsFalse;
1369     }
1370 }     
1371 #endif
1372
1373 /* ----------------------------------------------------------------------------
1374  * After running a thread...
1375  * ASSUMES: sched_mutex
1376  * ------------------------------------------------------------------------- */
1377
1378 static void
1379 schedulePostRunThread(void)
1380 {
1381 #if defined(PAR)
1382     /* HACK 675: if the last thread didn't yield, make sure to print a 
1383        SCHEDULE event to the log file when StgRunning the next thread, even
1384        if it is the same one as before */
1385     LastTSO = t; 
1386     TimeOfLastYield = CURRENT_TIME;
1387 #endif
1388
1389   /* some statistics gathering in the parallel case */
1390
1391 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1392   switch (ret) {
1393     case HeapOverflow:
1394 # if defined(GRAN)
1395       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1396       globalGranStats.tot_heapover++;
1397 # elif defined(PAR)
1398       globalParStats.tot_heapover++;
1399 # endif
1400       break;
1401
1402      case StackOverflow:
1403 # if defined(GRAN)
1404       IF_DEBUG(gran, 
1405                DumpGranEvent(GR_DESCHEDULE, t));
1406       globalGranStats.tot_stackover++;
1407 # elif defined(PAR)
1408       // IF_DEBUG(par, 
1409       // DumpGranEvent(GR_DESCHEDULE, t);
1410       globalParStats.tot_stackover++;
1411 # endif
1412       break;
1413
1414     case ThreadYielding:
1415 # if defined(GRAN)
1416       IF_DEBUG(gran, 
1417                DumpGranEvent(GR_DESCHEDULE, t));
1418       globalGranStats.tot_yields++;
1419 # elif defined(PAR)
1420       // IF_DEBUG(par, 
1421       // DumpGranEvent(GR_DESCHEDULE, t);
1422       globalParStats.tot_yields++;
1423 # endif
1424       break; 
1425
1426     case ThreadBlocked:
1427 # if defined(GRAN)
1428       IF_DEBUG(scheduler,
1429                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1430                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1431                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1432                if (t->block_info.closure!=(StgClosure*)NULL)
1433                  print_bq(t->block_info.closure);
1434                debugBelch("\n"));
1435
1436       // ??? needed; should emit block before
1437       IF_DEBUG(gran, 
1438                DumpGranEvent(GR_DESCHEDULE, t)); 
1439       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1440       /*
1441         ngoq Dogh!
1442       ASSERT(procStatus[CurrentProc]==Busy || 
1443               ((procStatus[CurrentProc]==Fetching) && 
1444               (t->block_info.closure!=(StgClosure*)NULL)));
1445       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1446           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1447             procStatus[CurrentProc]==Fetching)) 
1448         procStatus[CurrentProc] = Idle;
1449       */
1450 # elif defined(PAR)
1451 //++PAR++  blockThread() writes the event (change?)
1452 # endif
1453     break;
1454
1455   case ThreadFinished:
1456     break;
1457
1458   default:
1459     barf("parGlobalStats: unknown return code");
1460     break;
1461     }
1462 #endif
1463 }
1464
1465 /* -----------------------------------------------------------------------------
1466  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1467  * ASSUMES: sched_mutex
1468  * -------------------------------------------------------------------------- */
1469
1470 static rtsBool
1471 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1472 {
1473     // did the task ask for a large block?
1474     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1475         // if so, get one and push it on the front of the nursery.
1476         bdescr *bd;
1477         lnat blocks;
1478         
1479         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1480         
1481         IF_DEBUG(scheduler,
1482                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1483                             (long)t->id, whatNext_strs[t->what_next], blocks));
1484         
1485         // don't do this if the nursery is (nearly) full, we'll GC first.
1486         if (cap->r.rCurrentNursery->link != NULL ||
1487             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1488                                                // if the nursery has only one block.
1489             
1490             bd = allocGroup( blocks );
1491             cap->r.rNursery->n_blocks += blocks;
1492             
1493             // link the new group into the list
1494             bd->link = cap->r.rCurrentNursery;
1495             bd->u.back = cap->r.rCurrentNursery->u.back;
1496             if (cap->r.rCurrentNursery->u.back != NULL) {
1497                 cap->r.rCurrentNursery->u.back->link = bd;
1498             } else {
1499 #if !defined(SMP)
1500                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1501                        g0s0 == cap->r.rNursery);
1502 #endif
1503                 cap->r.rNursery->blocks = bd;
1504             }             
1505             cap->r.rCurrentNursery->u.back = bd;
1506             
1507             // initialise it as a nursery block.  We initialise the
1508             // step, gen_no, and flags field of *every* sub-block in
1509             // this large block, because this is easier than making
1510             // sure that we always find the block head of a large
1511             // block whenever we call Bdescr() (eg. evacuate() and
1512             // isAlive() in the GC would both have to do this, at
1513             // least).
1514             { 
1515                 bdescr *x;
1516                 for (x = bd; x < bd + blocks; x++) {
1517                     x->step = cap->r.rNursery;
1518                     x->gen_no = 0;
1519                     x->flags = 0;
1520                 }
1521             }
1522             
1523             // This assert can be a killer if the app is doing lots
1524             // of large block allocations.
1525             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1526             
1527             // now update the nursery to point to the new block
1528             cap->r.rCurrentNursery = bd;
1529             
1530             // we might be unlucky and have another thread get on the
1531             // run queue before us and steal the large block, but in that
1532             // case the thread will just end up requesting another large
1533             // block.
1534             PUSH_ON_RUN_QUEUE(t);
1535             return rtsFalse;  /* not actually GC'ing */
1536         }
1537     }
1538     
1539     IF_DEBUG(scheduler,
1540              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1541                         (long)t->id, whatNext_strs[t->what_next]));
1542     threadPaused(t);
1543 #if defined(GRAN)
1544     ASSERT(!is_on_queue(t,CurrentProc));
1545 #elif defined(PARALLEL_HASKELL)
1546     /* Currently we emit a DESCHEDULE event before GC in GUM.
1547        ToDo: either add separate event to distinguish SYSTEM time from rest
1548        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1549     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1550         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1551                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1552         emitSchedule = rtsTrue;
1553     }
1554 #endif
1555       
1556     PUSH_ON_RUN_QUEUE(t);
1557     return rtsTrue;
1558     /* actual GC is done at the end of the while loop in schedule() */
1559 }
1560
1561 /* -----------------------------------------------------------------------------
1562  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1563  * ASSUMES: sched_mutex
1564  * -------------------------------------------------------------------------- */
1565
1566 static void
1567 scheduleHandleStackOverflow( StgTSO *t)
1568 {
1569     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1570                                   (long)t->id, whatNext_strs[t->what_next]));
1571     /* just adjust the stack for this thread, then pop it back
1572      * on the run queue.
1573      */
1574     threadPaused(t);
1575     { 
1576         /* enlarge the stack */
1577         StgTSO *new_t = threadStackOverflow(t);
1578         
1579         /* This TSO has moved, so update any pointers to it from the
1580          * main thread stack.  It better not be on any other queues...
1581          * (it shouldn't be).
1582          */
1583         if (t->main != NULL) {
1584             t->main->tso = new_t;
1585         }
1586         PUSH_ON_RUN_QUEUE(new_t);
1587     }
1588 }
1589
1590 /* -----------------------------------------------------------------------------
1591  * Handle a thread that returned to the scheduler with ThreadYielding
1592  * ASSUMES: sched_mutex
1593  * -------------------------------------------------------------------------- */
1594
1595 static rtsBool
1596 scheduleHandleYield( StgTSO *t, nat prev_what_next )
1597 {
1598     // Reset the context switch flag.  We don't do this just before
1599     // running the thread, because that would mean we would lose ticks
1600     // during GC, which can lead to unfair scheduling (a thread hogs
1601     // the CPU because the tick always arrives during GC).  This way
1602     // penalises threads that do a lot of allocation, but that seems
1603     // better than the alternative.
1604     context_switch = 0;
1605     
1606     /* put the thread back on the run queue.  Then, if we're ready to
1607      * GC, check whether this is the last task to stop.  If so, wake
1608      * up the GC thread.  getThread will block during a GC until the
1609      * GC is finished.
1610      */
1611     IF_DEBUG(scheduler,
1612              if (t->what_next != prev_what_next) {
1613                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1614                             (long)t->id, whatNext_strs[t->what_next]);
1615              } else {
1616                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1617                             (long)t->id, whatNext_strs[t->what_next]);
1618              }
1619         );
1620     
1621     IF_DEBUG(sanity,
1622              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1623              checkTSO(t));
1624     ASSERT(t->link == END_TSO_QUEUE);
1625     
1626     // Shortcut if we're just switching evaluators: don't bother
1627     // doing stack squeezing (which can be expensive), just run the
1628     // thread.
1629     if (t->what_next != prev_what_next) {
1630         return rtsTrue;
1631     }
1632     
1633     threadPaused(t);
1634     
1635 #if defined(GRAN)
1636     ASSERT(!is_on_queue(t,CurrentProc));
1637       
1638     IF_DEBUG(sanity,
1639              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1640              checkThreadQsSanity(rtsTrue));
1641
1642 #endif
1643
1644     addToRunQueue(t);
1645
1646 #if defined(GRAN)
1647     /* add a ContinueThread event to actually process the thread */
1648     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1649               ContinueThread,
1650               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1651     IF_GRAN_DEBUG(bq, 
1652                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1653                   G_EVENTQ(0);
1654                   G_CURR_THREADQ(0));
1655 #endif
1656     return rtsFalse;
1657 }
1658
1659 /* -----------------------------------------------------------------------------
1660  * Handle a thread that returned to the scheduler with ThreadBlocked
1661  * ASSUMES: sched_mutex
1662  * -------------------------------------------------------------------------- */
1663
1664 static void
1665 scheduleHandleThreadBlocked( StgTSO *t
1666 #if !defined(GRAN) && !defined(DEBUG)
1667     STG_UNUSED
1668 #endif
1669     )
1670 {
1671 #if defined(GRAN)
1672     IF_DEBUG(scheduler,
1673              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1674                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1675              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1676     
1677     // ??? needed; should emit block before
1678     IF_DEBUG(gran, 
1679              DumpGranEvent(GR_DESCHEDULE, t)); 
1680     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1681     /*
1682       ngoq Dogh!
1683       ASSERT(procStatus[CurrentProc]==Busy || 
1684       ((procStatus[CurrentProc]==Fetching) && 
1685       (t->block_info.closure!=(StgClosure*)NULL)));
1686       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1687       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1688       procStatus[CurrentProc]==Fetching)) 
1689       procStatus[CurrentProc] = Idle;
1690     */
1691 #elif defined(PAR)
1692     IF_DEBUG(scheduler,
1693              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1694                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1695     IF_PAR_DEBUG(bq,
1696                  
1697                  if (t->block_info.closure!=(StgClosure*)NULL) 
1698                  print_bq(t->block_info.closure));
1699     
1700     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1701     blockThread(t);
1702     
1703     /* whatever we schedule next, we must log that schedule */
1704     emitSchedule = rtsTrue;
1705     
1706 #else /* !GRAN */
1707       /* don't need to do anything.  Either the thread is blocked on
1708        * I/O, in which case we'll have called addToBlockedQueue
1709        * previously, or it's blocked on an MVar or Blackhole, in which
1710        * case it'll be on the relevant queue already.
1711        */
1712     ASSERT(t->why_blocked != NotBlocked);
1713     IF_DEBUG(scheduler,
1714              debugBelch("--<< thread %d (%s) stopped: ", 
1715                         t->id, whatNext_strs[t->what_next]);
1716              printThreadBlockage(t);
1717              debugBelch("\n"));
1718     
1719     /* Only for dumping event to log file 
1720        ToDo: do I need this in GranSim, too?
1721        blockThread(t);
1722     */
1723 #endif
1724 }
1725
1726 /* -----------------------------------------------------------------------------
1727  * Handle a thread that returned to the scheduler with ThreadFinished
1728  * ASSUMES: sched_mutex
1729  * -------------------------------------------------------------------------- */
1730
1731 static rtsBool
1732 scheduleHandleThreadFinished( StgMainThread *mainThread
1733                               USED_WHEN_RTS_SUPPORTS_THREADS,
1734                               Capability *cap,
1735                               StgTSO *t )
1736 {
1737     /* Need to check whether this was a main thread, and if so,
1738      * return with the return value.
1739      *
1740      * We also end up here if the thread kills itself with an
1741      * uncaught exception, see Exception.cmm.
1742      */
1743     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1744                                   t->id, whatNext_strs[t->what_next]));
1745
1746 #if defined(GRAN)
1747       endThread(t, CurrentProc); // clean-up the thread
1748 #elif defined(PARALLEL_HASKELL)
1749       /* For now all are advisory -- HWL */
1750       //if(t->priority==AdvisoryPriority) ??
1751       advisory_thread_count--; // JB: Caution with this counter, buggy!
1752       
1753 # if defined(DIST)
1754       if(t->dist.priority==RevalPriority)
1755         FinishReval(t);
1756 # endif
1757     
1758 # if defined(EDENOLD)
1759       // the thread could still have an outport... (BUG)
1760       if (t->eden.outport != -1) {
1761       // delete the outport for the tso which has finished...
1762         IF_PAR_DEBUG(eden_ports,
1763                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1764                               t->eden.outport, t->id));
1765         deleteOPT(t);
1766       }
1767       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1768       if (t->eden.epid != -1) {
1769         IF_PAR_DEBUG(eden_ports,
1770                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1771                            t->id, t->eden.epid));
1772         removeTSOfromProcess(t);
1773       }
1774 # endif 
1775
1776 # if defined(PAR)
1777       if (RtsFlags.ParFlags.ParStats.Full &&
1778           !RtsFlags.ParFlags.ParStats.Suppressed) 
1779         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1780
1781       //  t->par only contains statistics: left out for now...
1782       IF_PAR_DEBUG(fish,
1783                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1784                               t->id,t,t->par.sparkname));
1785 # endif
1786 #endif // PARALLEL_HASKELL
1787
1788       //
1789       // Check whether the thread that just completed was a main
1790       // thread, and if so return with the result.  
1791       //
1792       // There is an assumption here that all thread completion goes
1793       // through this point; we need to make sure that if a thread
1794       // ends up in the ThreadKilled state, that it stays on the run
1795       // queue so it can be dealt with here.
1796       //
1797       if (
1798 #if defined(RTS_SUPPORTS_THREADS)
1799           mainThread != NULL
1800 #else
1801           mainThread->tso == t
1802 #endif
1803           )
1804       {
1805           // We are a bound thread: this must be our thread that just
1806           // completed.
1807           ASSERT(mainThread->tso == t);
1808
1809           if (t->what_next == ThreadComplete) {
1810               if (mainThread->ret) {
1811                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1812                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1813               }
1814               mainThread->stat = Success;
1815           } else {
1816               if (mainThread->ret) {
1817                   *(mainThread->ret) = NULL;
1818               }
1819               if (interrupted) {
1820                   mainThread->stat = Interrupted;
1821               } else {
1822                   mainThread->stat = Killed;
1823               }
1824           }
1825 #ifdef DEBUG
1826           removeThreadLabel((StgWord)mainThread->tso->id);
1827 #endif
1828           if (mainThread->prev == NULL) {
1829               ASSERT(mainThread == main_threads);
1830               main_threads = mainThread->link;
1831           } else {
1832               mainThread->prev->link = mainThread->link;
1833           }
1834           if (mainThread->link != NULL) {
1835               mainThread->link->prev = mainThread->prev;
1836           }
1837           releaseCapability(cap);
1838           return rtsTrue; // tells schedule() to return
1839       }
1840
1841 #ifdef RTS_SUPPORTS_THREADS
1842       ASSERT(t->main == NULL);
1843 #else
1844       if (t->main != NULL) {
1845           // Must be a main thread that is not the topmost one.  Leave
1846           // it on the run queue until the stack has unwound to the
1847           // point where we can deal with this.  Leaving it on the run
1848           // queue also ensures that the garbage collector knows about
1849           // this thread and its return value (it gets dropped from the
1850           // all_threads list so there's no other way to find it).
1851           APPEND_TO_RUN_QUEUE(t);
1852       }
1853 #endif
1854       return rtsFalse;
1855 }
1856
1857 /* -----------------------------------------------------------------------------
1858  * Perform a heap census, if PROFILING
1859  * -------------------------------------------------------------------------- */
1860
1861 static rtsBool
1862 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1863 {
1864 #if defined(PROFILING)
1865     // When we have +RTS -i0 and we're heap profiling, do a census at
1866     // every GC.  This lets us get repeatable runs for debugging.
1867     if (performHeapProfile ||
1868         (RtsFlags.ProfFlags.profileInterval==0 &&
1869          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1870         GarbageCollect(GetRoots, rtsTrue);
1871         heapCensus();
1872         performHeapProfile = rtsFalse;
1873         return rtsTrue;  // true <=> we already GC'd
1874     }
1875 #endif
1876     return rtsFalse;
1877 }
1878
1879 /* -----------------------------------------------------------------------------
1880  * Perform a garbage collection if necessary
1881  * ASSUMES: sched_mutex
1882  * -------------------------------------------------------------------------- */
1883
1884 static void
1885 scheduleDoGC( Capability *cap STG_UNUSED )
1886 {
1887     StgTSO *t;
1888 #ifdef SMP
1889     static rtsBool waiting_for_gc;
1890     int n_capabilities = RtsFlags.ParFlags.nNodes - 1; 
1891            // subtract one because we're already holding one.
1892     Capability *caps[n_capabilities];
1893 #endif
1894
1895 #ifdef SMP
1896     // In order to GC, there must be no threads running Haskell code.
1897     // Therefore, the GC thread needs to hold *all* the capabilities,
1898     // and release them after the GC has completed.  
1899     //
1900     // This seems to be the simplest way: previous attempts involved
1901     // making all the threads with capabilities give up their
1902     // capabilities and sleep except for the *last* one, which
1903     // actually did the GC.  But it's quite hard to arrange for all
1904     // the other tasks to sleep and stay asleep.
1905     //
1906         
1907     // Someone else is already trying to GC
1908     if (waiting_for_gc) return;
1909     waiting_for_gc = rtsTrue;
1910
1911     caps[n_capabilities] = cap;
1912     while (n_capabilities > 0) {
1913         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
1914         waitForReturnCapability(&sched_mutex, &cap);
1915         n_capabilities--;
1916         caps[n_capabilities] = cap;
1917     }
1918
1919     waiting_for_gc = rtsFalse;
1920 #endif
1921
1922     /* Kick any transactions which are invalid back to their
1923      * atomically frames.  When next scheduled they will try to
1924      * commit, this commit will fail and they will retry.
1925      */
1926     for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1927         if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1928             if (!stmValidateTransaction (t -> trec)) {
1929                 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1930                 
1931                 // strip the stack back to the ATOMICALLY_FRAME, aborting
1932                 // the (nested) transaction, and saving the stack of any
1933                 // partially-evaluated thunks on the heap.
1934                 raiseAsync_(t, NULL, rtsTrue);
1935                 
1936 #ifdef REG_R1
1937                 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1938 #endif
1939             }
1940         }
1941     }
1942     
1943     // so this happens periodically:
1944     scheduleCheckBlackHoles();
1945     
1946     /* everybody back, start the GC.
1947      * Could do it in this thread, or signal a condition var
1948      * to do it in another thread.  Either way, we need to
1949      * broadcast on gc_pending_cond afterward.
1950      */
1951 #if defined(RTS_SUPPORTS_THREADS)
1952     IF_DEBUG(scheduler,sched_belch("doing GC"));
1953 #endif
1954     GarbageCollect(GetRoots,rtsFalse);
1955     
1956 #if defined(SMP)
1957     {
1958         // release our stash of capabilities.
1959         nat i;
1960         for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
1961             releaseCapability(caps[i]);
1962         }
1963     }
1964 #endif
1965
1966 #if defined(GRAN)
1967     /* add a ContinueThread event to continue execution of current thread */
1968     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1969               ContinueThread,
1970               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1971     IF_GRAN_DEBUG(bq, 
1972                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1973                   G_EVENTQ(0);
1974                   G_CURR_THREADQ(0));
1975 #endif /* GRAN */
1976 }
1977
1978 /* ---------------------------------------------------------------------------
1979  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1980  * used by Control.Concurrent for error checking.
1981  * ------------------------------------------------------------------------- */
1982  
1983 StgBool
1984 rtsSupportsBoundThreads(void)
1985 {
1986 #if defined(RTS_SUPPORTS_THREADS)
1987   return rtsTrue;
1988 #else
1989   return rtsFalse;
1990 #endif
1991 }
1992
1993 /* ---------------------------------------------------------------------------
1994  * isThreadBound(tso): check whether tso is bound to an OS thread.
1995  * ------------------------------------------------------------------------- */
1996  
1997 StgBool
1998 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1999 {
2000 #if defined(RTS_SUPPORTS_THREADS)
2001   return (tso->main != NULL);
2002 #endif
2003   return rtsFalse;
2004 }
2005
2006 /* ---------------------------------------------------------------------------
2007  * Singleton fork(). Do not copy any running threads.
2008  * ------------------------------------------------------------------------- */
2009
2010 #ifndef mingw32_HOST_OS
2011 #define FORKPROCESS_PRIMOP_SUPPORTED
2012 #endif
2013
2014 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2015 static void 
2016 deleteThreadImmediately(StgTSO *tso);
2017 #endif
2018 StgInt
2019 forkProcess(HsStablePtr *entry
2020 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2021             STG_UNUSED
2022 #endif
2023            )
2024 {
2025 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2026   pid_t pid;
2027   StgTSO* t,*next;
2028   StgMainThread *m;
2029   SchedulerStatus rc;
2030
2031   IF_DEBUG(scheduler,sched_belch("forking!"));
2032   rts_lock(); // This not only acquires sched_mutex, it also
2033               // makes sure that no other threads are running
2034
2035   pid = fork();
2036
2037   if (pid) { /* parent */
2038
2039   /* just return the pid */
2040     rts_unlock();
2041     return pid;
2042     
2043   } else { /* child */
2044     
2045     
2046       // delete all threads
2047     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
2048     
2049     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2050       next = t->link;
2051
2052         // don't allow threads to catch the ThreadKilled exception
2053       deleteThreadImmediately(t);
2054     }
2055     
2056       // wipe the main thread list
2057     while((m = main_threads) != NULL) {
2058       main_threads = m->link;
2059 # ifdef THREADED_RTS
2060       closeCondition(&m->bound_thread_cond);
2061 # endif
2062       stgFree(m);
2063     }
2064     
2065     rc = rts_evalStableIO(entry, NULL);  // run the action
2066     rts_checkSchedStatus("forkProcess",rc);
2067     
2068     rts_unlock();
2069     
2070     hs_exit();                      // clean up and exit
2071     stg_exit(0);
2072   }
2073 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2074   barf("forkProcess#: primop not supported, sorry!\n");
2075   return -1;
2076 #endif
2077 }
2078
2079 /* ---------------------------------------------------------------------------
2080  * deleteAllThreads():  kill all the live threads.
2081  *
2082  * This is used when we catch a user interrupt (^C), before performing
2083  * any necessary cleanups and running finalizers.
2084  *
2085  * Locks: sched_mutex held.
2086  * ------------------------------------------------------------------------- */
2087    
2088 void
2089 deleteAllThreads ( void )
2090 {
2091   StgTSO* t, *next;
2092   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2093   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2094       if (t->what_next == ThreadRelocated) {
2095           next = t->link;
2096       } else {
2097           next = t->global_link;
2098           deleteThread(t);
2099       }
2100   }      
2101
2102   // The run queue now contains a bunch of ThreadKilled threads.  We
2103   // must not throw these away: the main thread(s) will be in there
2104   // somewhere, and the main scheduler loop has to deal with it.
2105   // Also, the run queue is the only thing keeping these threads from
2106   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2107
2108   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2109   ASSERT(blackhole_queue == END_TSO_QUEUE);
2110   ASSERT(sleeping_queue == END_TSO_QUEUE);
2111 }
2112
2113 /* startThread and  insertThread are now in GranSim.c -- HWL */
2114
2115
2116 /* ---------------------------------------------------------------------------
2117  * Suspending & resuming Haskell threads.
2118  * 
2119  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2120  * its capability before calling the C function.  This allows another
2121  * task to pick up the capability and carry on running Haskell
2122  * threads.  It also means that if the C call blocks, it won't lock
2123  * the whole system.
2124  *
2125  * The Haskell thread making the C call is put to sleep for the
2126  * duration of the call, on the susepended_ccalling_threads queue.  We
2127  * give out a token to the task, which it can use to resume the thread
2128  * on return from the C function.
2129  * ------------------------------------------------------------------------- */
2130    
2131 StgInt
2132 suspendThread( StgRegTable *reg )
2133 {
2134   nat tok;
2135   Capability *cap;
2136   int saved_errno = errno;
2137
2138   /* assume that *reg is a pointer to the StgRegTable part
2139    * of a Capability.
2140    */
2141   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
2142
2143   ACQUIRE_LOCK(&sched_mutex);
2144
2145   IF_DEBUG(scheduler,
2146            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
2147
2148   // XXX this might not be necessary --SDM
2149   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
2150
2151   threadPaused(cap->r.rCurrentTSO);
2152   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
2153   suspended_ccalling_threads = cap->r.rCurrentTSO;
2154
2155   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
2156       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
2157       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
2158   } else {
2159       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
2160   }
2161
2162   /* Use the thread ID as the token; it should be unique */
2163   tok = cap->r.rCurrentTSO->id;
2164
2165   /* Hand back capability */
2166   cap->r.rInHaskell = rtsFalse;
2167   releaseCapability(cap);
2168   
2169 #if defined(RTS_SUPPORTS_THREADS)
2170   /* Preparing to leave the RTS, so ensure there's a native thread/task
2171      waiting to take over.
2172   */
2173   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
2174 #endif
2175
2176   RELEASE_LOCK(&sched_mutex);
2177   
2178   errno = saved_errno;
2179   return tok; 
2180 }
2181
2182 StgRegTable *
2183 resumeThread( StgInt tok )
2184 {
2185   StgTSO *tso, **prev;
2186   Capability *cap;
2187   int saved_errno = errno;
2188
2189 #if defined(RTS_SUPPORTS_THREADS)
2190   /* Wait for permission to re-enter the RTS with the result. */
2191   ACQUIRE_LOCK(&sched_mutex);
2192   waitForReturnCapability(&sched_mutex, &cap);
2193
2194   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
2195 #else
2196   grabCapability(&cap);
2197 #endif
2198
2199   /* Remove the thread off of the suspended list */
2200   prev = &suspended_ccalling_threads;
2201   for (tso = suspended_ccalling_threads; 
2202        tso != END_TSO_QUEUE; 
2203        prev = &tso->link, tso = tso->link) {
2204     if (tso->id == (StgThreadID)tok) {
2205       *prev = tso->link;
2206       break;
2207     }
2208   }
2209   if (tso == END_TSO_QUEUE) {
2210     barf("resumeThread: thread not found");
2211   }
2212   tso->link = END_TSO_QUEUE;
2213   
2214   if(tso->why_blocked == BlockedOnCCall) {
2215       awakenBlockedQueueNoLock(tso->blocked_exceptions);
2216       tso->blocked_exceptions = NULL;
2217   }
2218   
2219   /* Reset blocking status */
2220   tso->why_blocked  = NotBlocked;
2221
2222   cap->r.rCurrentTSO = tso;
2223   cap->r.rInHaskell = rtsTrue;
2224   RELEASE_LOCK(&sched_mutex);
2225   errno = saved_errno;
2226   return &cap->r;
2227 }
2228
2229 /* ---------------------------------------------------------------------------
2230  * Comparing Thread ids.
2231  *
2232  * This is used from STG land in the implementation of the
2233  * instances of Eq/Ord for ThreadIds.
2234  * ------------------------------------------------------------------------ */
2235
2236 int
2237 cmp_thread(StgPtr tso1, StgPtr tso2) 
2238
2239   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2240   StgThreadID id2 = ((StgTSO *)tso2)->id;
2241  
2242   if (id1 < id2) return (-1);
2243   if (id1 > id2) return 1;
2244   return 0;
2245 }
2246
2247 /* ---------------------------------------------------------------------------
2248  * Fetching the ThreadID from an StgTSO.
2249  *
2250  * This is used in the implementation of Show for ThreadIds.
2251  * ------------------------------------------------------------------------ */
2252 int
2253 rts_getThreadId(StgPtr tso) 
2254 {
2255   return ((StgTSO *)tso)->id;
2256 }
2257
2258 #ifdef DEBUG
2259 void
2260 labelThread(StgPtr tso, char *label)
2261 {
2262   int len;
2263   void *buf;
2264
2265   /* Caveat: Once set, you can only set the thread name to "" */
2266   len = strlen(label)+1;
2267   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2268   strncpy(buf,label,len);
2269   /* Update will free the old memory for us */
2270   updateThreadLabel(((StgTSO *)tso)->id,buf);
2271 }
2272 #endif /* DEBUG */
2273
2274 /* ---------------------------------------------------------------------------
2275    Create a new thread.
2276
2277    The new thread starts with the given stack size.  Before the
2278    scheduler can run, however, this thread needs to have a closure
2279    (and possibly some arguments) pushed on its stack.  See
2280    pushClosure() in Schedule.h.
2281
2282    createGenThread() and createIOThread() (in SchedAPI.h) are
2283    convenient packaged versions of this function.
2284
2285    currently pri (priority) is only used in a GRAN setup -- HWL
2286    ------------------------------------------------------------------------ */
2287 #if defined(GRAN)
2288 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2289 StgTSO *
2290 createThread(nat size, StgInt pri)
2291 #else
2292 StgTSO *
2293 createThread(nat size)
2294 #endif
2295 {
2296
2297     StgTSO *tso;
2298     nat stack_size;
2299
2300     /* First check whether we should create a thread at all */
2301 #if defined(PARALLEL_HASKELL)
2302   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2303   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2304     threadsIgnored++;
2305     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2306           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2307     return END_TSO_QUEUE;
2308   }
2309   threadsCreated++;
2310 #endif
2311
2312 #if defined(GRAN)
2313   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2314 #endif
2315
2316   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2317
2318   /* catch ridiculously small stack sizes */
2319   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2320     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2321   }
2322
2323   stack_size = size - TSO_STRUCT_SIZEW;
2324
2325   tso = (StgTSO *)allocate(size);
2326   TICK_ALLOC_TSO(stack_size, 0);
2327
2328   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2329 #if defined(GRAN)
2330   SET_GRAN_HDR(tso, ThisPE);
2331 #endif
2332
2333   // Always start with the compiled code evaluator
2334   tso->what_next = ThreadRunGHC;
2335
2336   tso->id = next_thread_id++; 
2337   tso->why_blocked  = NotBlocked;
2338   tso->blocked_exceptions = NULL;
2339
2340   tso->saved_errno = 0;
2341   tso->main = NULL;
2342   
2343   tso->stack_size   = stack_size;
2344   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2345                               - TSO_STRUCT_SIZEW;
2346   tso->sp           = (P_)&(tso->stack) + stack_size;
2347
2348   tso->trec = NO_TREC;
2349
2350 #ifdef PROFILING
2351   tso->prof.CCCS = CCS_MAIN;
2352 #endif
2353
2354   /* put a stop frame on the stack */
2355   tso->sp -= sizeofW(StgStopFrame);
2356   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2357   tso->link = END_TSO_QUEUE;
2358
2359   // ToDo: check this
2360 #if defined(GRAN)
2361   /* uses more flexible routine in GranSim */
2362   insertThread(tso, CurrentProc);
2363 #else
2364   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2365    * from its creation
2366    */
2367 #endif
2368
2369 #if defined(GRAN) 
2370   if (RtsFlags.GranFlags.GranSimStats.Full) 
2371     DumpGranEvent(GR_START,tso);
2372 #elif defined(PARALLEL_HASKELL)
2373   if (RtsFlags.ParFlags.ParStats.Full) 
2374     DumpGranEvent(GR_STARTQ,tso);
2375   /* HACk to avoid SCHEDULE 
2376      LastTSO = tso; */
2377 #endif
2378
2379   /* Link the new thread on the global thread list.
2380    */
2381   tso->global_link = all_threads;
2382   all_threads = tso;
2383
2384 #if defined(DIST)
2385   tso->dist.priority = MandatoryPriority; //by default that is...
2386 #endif
2387
2388 #if defined(GRAN)
2389   tso->gran.pri = pri;
2390 # if defined(DEBUG)
2391   tso->gran.magic = TSO_MAGIC; // debugging only
2392 # endif
2393   tso->gran.sparkname   = 0;
2394   tso->gran.startedat   = CURRENT_TIME; 
2395   tso->gran.exported    = 0;
2396   tso->gran.basicblocks = 0;
2397   tso->gran.allocs      = 0;
2398   tso->gran.exectime    = 0;
2399   tso->gran.fetchtime   = 0;
2400   tso->gran.fetchcount  = 0;
2401   tso->gran.blocktime   = 0;
2402   tso->gran.blockcount  = 0;
2403   tso->gran.blockedat   = 0;
2404   tso->gran.globalsparks = 0;
2405   tso->gran.localsparks  = 0;
2406   if (RtsFlags.GranFlags.Light)
2407     tso->gran.clock  = Now; /* local clock */
2408   else
2409     tso->gran.clock  = 0;
2410
2411   IF_DEBUG(gran,printTSO(tso));
2412 #elif defined(PARALLEL_HASKELL)
2413 # if defined(DEBUG)
2414   tso->par.magic = TSO_MAGIC; // debugging only
2415 # endif
2416   tso->par.sparkname   = 0;
2417   tso->par.startedat   = CURRENT_TIME; 
2418   tso->par.exported    = 0;
2419   tso->par.basicblocks = 0;
2420   tso->par.allocs      = 0;
2421   tso->par.exectime    = 0;
2422   tso->par.fetchtime   = 0;
2423   tso->par.fetchcount  = 0;
2424   tso->par.blocktime   = 0;
2425   tso->par.blockcount  = 0;
2426   tso->par.blockedat   = 0;
2427   tso->par.globalsparks = 0;
2428   tso->par.localsparks  = 0;
2429 #endif
2430
2431 #if defined(GRAN)
2432   globalGranStats.tot_threads_created++;
2433   globalGranStats.threads_created_on_PE[CurrentProc]++;
2434   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2435   globalGranStats.tot_sq_probes++;
2436 #elif defined(PARALLEL_HASKELL)
2437   // collect parallel global statistics (currently done together with GC stats)
2438   if (RtsFlags.ParFlags.ParStats.Global &&
2439       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2440     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2441     globalParStats.tot_threads_created++;
2442   }
2443 #endif 
2444
2445 #if defined(GRAN)
2446   IF_GRAN_DEBUG(pri,
2447                 sched_belch("==__ schedule: Created TSO %d (%p);",
2448                       CurrentProc, tso, tso->id));
2449 #elif defined(PARALLEL_HASKELL)
2450   IF_PAR_DEBUG(verbose,
2451                sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2452                            (long)tso->id, tso, advisory_thread_count));
2453 #else
2454   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2455                                  (long)tso->id, (long)tso->stack_size));
2456 #endif    
2457   return tso;
2458 }
2459
2460 #if defined(PAR)
2461 /* RFP:
2462    all parallel thread creation calls should fall through the following routine.
2463 */
2464 StgTSO *
2465 createThreadFromSpark(rtsSpark spark) 
2466 { StgTSO *tso;
2467   ASSERT(spark != (rtsSpark)NULL);
2468 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2469   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2470   { threadsIgnored++;
2471     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2472           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2473     return END_TSO_QUEUE;
2474   }
2475   else
2476   { threadsCreated++;
2477     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2478     if (tso==END_TSO_QUEUE)     
2479       barf("createSparkThread: Cannot create TSO");
2480 #if defined(DIST)
2481     tso->priority = AdvisoryPriority;
2482 #endif
2483     pushClosure(tso,spark);
2484     addToRunQueue(tso);
2485     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2486   }
2487   return tso;
2488 }
2489 #endif
2490
2491 /*
2492   Turn a spark into a thread.
2493   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2494 */
2495 #if 0
2496 StgTSO *
2497 activateSpark (rtsSpark spark) 
2498 {
2499   StgTSO *tso;
2500
2501   tso = createSparkThread(spark);
2502   if (RtsFlags.ParFlags.ParStats.Full) {   
2503     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2504       IF_PAR_DEBUG(verbose,
2505                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2506                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2507   }
2508   // ToDo: fwd info on local/global spark to thread -- HWL
2509   // tso->gran.exported =  spark->exported;
2510   // tso->gran.locked =   !spark->global;
2511   // tso->gran.sparkname = spark->name;
2512
2513   return tso;
2514 }
2515 #endif
2516
2517 /* ---------------------------------------------------------------------------
2518  * scheduleThread()
2519  *
2520  * scheduleThread puts a thread on the head of the runnable queue.
2521  * This will usually be done immediately after a thread is created.
2522  * The caller of scheduleThread must create the thread using e.g.
2523  * createThread and push an appropriate closure
2524  * on this thread's stack before the scheduler is invoked.
2525  * ------------------------------------------------------------------------ */
2526
2527 static void
2528 scheduleThread_(StgTSO *tso)
2529 {
2530   // The thread goes at the *end* of the run-queue, to avoid possible
2531   // starvation of any threads already on the queue.
2532   APPEND_TO_RUN_QUEUE(tso);
2533   threadRunnable();
2534 }
2535
2536 void
2537 scheduleThread(StgTSO* tso)
2538 {
2539   ACQUIRE_LOCK(&sched_mutex);
2540   scheduleThread_(tso);
2541   RELEASE_LOCK(&sched_mutex);
2542 }
2543
2544 #if defined(RTS_SUPPORTS_THREADS)
2545 static Condition bound_cond_cache;
2546 static int bound_cond_cache_full = 0;
2547 #endif
2548
2549
2550 SchedulerStatus
2551 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
2552                    Capability *initialCapability)
2553 {
2554     // Precondition: sched_mutex must be held
2555     StgMainThread *m;
2556
2557     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2558     m->tso = tso;
2559     tso->main = m;
2560     m->ret = ret;
2561     m->stat = NoStatus;
2562     m->link = main_threads;
2563     m->prev = NULL;
2564     if (main_threads != NULL) {
2565         main_threads->prev = m;
2566     }
2567     main_threads = m;
2568
2569 #if defined(RTS_SUPPORTS_THREADS)
2570     // Allocating a new condition for each thread is expensive, so we
2571     // cache one.  This is a pretty feeble hack, but it helps speed up
2572     // consecutive call-ins quite a bit.
2573     if (bound_cond_cache_full) {
2574         m->bound_thread_cond = bound_cond_cache;
2575         bound_cond_cache_full = 0;
2576     } else {
2577         initCondition(&m->bound_thread_cond);
2578     }
2579 #endif
2580
2581     /* Put the thread on the main-threads list prior to scheduling the TSO.
2582        Failure to do so introduces a race condition in the MT case (as
2583        identified by Wolfgang Thaller), whereby the new task/OS thread 
2584        created by scheduleThread_() would complete prior to the thread
2585        that spawned it managed to put 'itself' on the main-threads list.
2586        The upshot of it all being that the worker thread wouldn't get to
2587        signal the completion of the its work item for the main thread to
2588        see (==> it got stuck waiting.)    -- sof 6/02.
2589     */
2590     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2591     
2592     APPEND_TO_RUN_QUEUE(tso);
2593     // NB. Don't call threadRunnable() here, because the thread is
2594     // bound and only runnable by *this* OS thread, so waking up other
2595     // workers will just slow things down.
2596
2597     return waitThread_(m, initialCapability);
2598 }
2599
2600 /* ---------------------------------------------------------------------------
2601  * initScheduler()
2602  *
2603  * Initialise the scheduler.  This resets all the queues - if the
2604  * queues contained any threads, they'll be garbage collected at the
2605  * next pass.
2606  *
2607  * ------------------------------------------------------------------------ */
2608
2609 void 
2610 initScheduler(void)
2611 {
2612 #if defined(GRAN)
2613   nat i;
2614
2615   for (i=0; i<=MAX_PROC; i++) {
2616     run_queue_hds[i]      = END_TSO_QUEUE;
2617     run_queue_tls[i]      = END_TSO_QUEUE;
2618     blocked_queue_hds[i]  = END_TSO_QUEUE;
2619     blocked_queue_tls[i]  = END_TSO_QUEUE;
2620     ccalling_threadss[i]  = END_TSO_QUEUE;
2621     blackhole_queue[i]    = END_TSO_QUEUE;
2622     sleeping_queue        = END_TSO_QUEUE;
2623   }
2624 #else
2625   run_queue_hd      = END_TSO_QUEUE;
2626   run_queue_tl      = END_TSO_QUEUE;
2627   blocked_queue_hd  = END_TSO_QUEUE;
2628   blocked_queue_tl  = END_TSO_QUEUE;
2629   blackhole_queue   = END_TSO_QUEUE;
2630   sleeping_queue    = END_TSO_QUEUE;
2631 #endif 
2632
2633   suspended_ccalling_threads  = END_TSO_QUEUE;
2634
2635   main_threads = NULL;
2636   all_threads  = END_TSO_QUEUE;
2637
2638   context_switch = 0;
2639   interrupted    = 0;
2640
2641   RtsFlags.ConcFlags.ctxtSwitchTicks =
2642       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2643       
2644 #if defined(RTS_SUPPORTS_THREADS)
2645   /* Initialise the mutex and condition variables used by
2646    * the scheduler. */
2647   initMutex(&sched_mutex);
2648   initMutex(&term_mutex);
2649 #endif
2650   
2651   ACQUIRE_LOCK(&sched_mutex);
2652
2653   /* A capability holds the state a native thread needs in
2654    * order to execute STG code. At least one capability is
2655    * floating around (only SMP builds have more than one).
2656    */
2657   initCapabilities();
2658   
2659 #if defined(RTS_SUPPORTS_THREADS)
2660   initTaskManager();
2661 #endif
2662
2663 #if defined(SMP)
2664   /* eagerly start some extra workers */
2665   startingWorkerThread = RtsFlags.ParFlags.nNodes;
2666   startTasks(RtsFlags.ParFlags.nNodes, taskStart);
2667 #endif
2668
2669 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2670   initSparkPools();
2671 #endif
2672
2673   RELEASE_LOCK(&sched_mutex);
2674 }
2675
2676 void
2677 exitScheduler( void )
2678 {
2679     interrupted = rtsTrue;
2680     shutting_down_scheduler = rtsTrue;
2681 #if defined(RTS_SUPPORTS_THREADS)
2682     if (threadIsTask(osThreadId())) { taskStop(); }
2683     stopTaskManager();
2684 #endif
2685 }
2686
2687 /* ----------------------------------------------------------------------------
2688    Managing the per-task allocation areas.
2689    
2690    Each capability comes with an allocation area.  These are
2691    fixed-length block lists into which allocation can be done.
2692
2693    ToDo: no support for two-space collection at the moment???
2694    ------------------------------------------------------------------------- */
2695
2696 static SchedulerStatus
2697 waitThread_(StgMainThread* m, Capability *initialCapability)
2698 {
2699   SchedulerStatus stat;
2700
2701   // Precondition: sched_mutex must be held.
2702   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2703
2704 #if defined(GRAN)
2705   /* GranSim specific init */
2706   CurrentTSO = m->tso;                // the TSO to run
2707   procStatus[MainProc] = Busy;        // status of main PE
2708   CurrentProc = MainProc;             // PE to run it on
2709   schedule(m,initialCapability);
2710 #else
2711   schedule(m,initialCapability);
2712   ASSERT(m->stat != NoStatus);
2713 #endif
2714
2715   stat = m->stat;
2716
2717 #if defined(RTS_SUPPORTS_THREADS)
2718   // Free the condition variable, returning it to the cache if possible.
2719   if (!bound_cond_cache_full) {
2720       bound_cond_cache = m->bound_thread_cond;
2721       bound_cond_cache_full = 1;
2722   } else {
2723       closeCondition(&m->bound_thread_cond);
2724   }
2725 #endif
2726
2727   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2728   stgFree(m);
2729
2730   // Postcondition: sched_mutex still held
2731   return stat;
2732 }
2733
2734 /* ---------------------------------------------------------------------------
2735    Where are the roots that we know about?
2736
2737         - all the threads on the runnable queue
2738         - all the threads on the blocked queue
2739         - all the threads on the sleeping queue
2740         - all the thread currently executing a _ccall_GC
2741         - all the "main threads"
2742      
2743    ------------------------------------------------------------------------ */
2744
2745 /* This has to be protected either by the scheduler monitor, or by the
2746         garbage collection monitor (probably the latter).
2747         KH @ 25/10/99
2748 */
2749
2750 void
2751 GetRoots( evac_fn evac )
2752 {
2753 #if defined(GRAN)
2754   {
2755     nat i;
2756     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2757       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2758           evac((StgClosure **)&run_queue_hds[i]);
2759       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2760           evac((StgClosure **)&run_queue_tls[i]);
2761       
2762       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2763           evac((StgClosure **)&blocked_queue_hds[i]);
2764       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2765           evac((StgClosure **)&blocked_queue_tls[i]);
2766       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2767           evac((StgClosure **)&ccalling_threads[i]);
2768     }
2769   }
2770
2771   markEventQueue();
2772
2773 #else /* !GRAN */
2774   if (run_queue_hd != END_TSO_QUEUE) {
2775       ASSERT(run_queue_tl != END_TSO_QUEUE);
2776       evac((StgClosure **)&run_queue_hd);
2777       evac((StgClosure **)&run_queue_tl);
2778   }
2779   
2780   if (blocked_queue_hd != END_TSO_QUEUE) {
2781       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2782       evac((StgClosure **)&blocked_queue_hd);
2783       evac((StgClosure **)&blocked_queue_tl);
2784   }
2785   
2786   if (sleeping_queue != END_TSO_QUEUE) {
2787       evac((StgClosure **)&sleeping_queue);
2788   }
2789 #endif 
2790
2791   if (blackhole_queue != END_TSO_QUEUE) {
2792       evac((StgClosure **)&blackhole_queue);
2793   }
2794
2795   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2796       evac((StgClosure **)&suspended_ccalling_threads);
2797   }
2798
2799 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2800   markSparkQueue(evac);
2801 #endif
2802
2803 #if defined(RTS_USER_SIGNALS)
2804   // mark the signal handlers (signals should be already blocked)
2805   markSignalHandlers(evac);
2806 #endif
2807 }
2808
2809 /* -----------------------------------------------------------------------------
2810    performGC
2811
2812    This is the interface to the garbage collector from Haskell land.
2813    We provide this so that external C code can allocate and garbage
2814    collect when called from Haskell via _ccall_GC.
2815
2816    It might be useful to provide an interface whereby the programmer
2817    can specify more roots (ToDo).
2818    
2819    This needs to be protected by the GC condition variable above.  KH.
2820    -------------------------------------------------------------------------- */
2821
2822 static void (*extra_roots)(evac_fn);
2823
2824 void
2825 performGC(void)
2826 {
2827   /* Obligated to hold this lock upon entry */
2828   ACQUIRE_LOCK(&sched_mutex);
2829   GarbageCollect(GetRoots,rtsFalse);
2830   RELEASE_LOCK(&sched_mutex);
2831 }
2832
2833 void
2834 performMajorGC(void)
2835 {
2836   ACQUIRE_LOCK(&sched_mutex);
2837   GarbageCollect(GetRoots,rtsTrue);
2838   RELEASE_LOCK(&sched_mutex);
2839 }
2840
2841 static void
2842 AllRoots(evac_fn evac)
2843 {
2844     GetRoots(evac);             // the scheduler's roots
2845     extra_roots(evac);          // the user's roots
2846 }
2847
2848 void
2849 performGCWithRoots(void (*get_roots)(evac_fn))
2850 {
2851   ACQUIRE_LOCK(&sched_mutex);
2852   extra_roots = get_roots;
2853   GarbageCollect(AllRoots,rtsFalse);
2854   RELEASE_LOCK(&sched_mutex);
2855 }
2856
2857 /* -----------------------------------------------------------------------------
2858    Stack overflow
2859
2860    If the thread has reached its maximum stack size, then raise the
2861    StackOverflow exception in the offending thread.  Otherwise
2862    relocate the TSO into a larger chunk of memory and adjust its stack
2863    size appropriately.
2864    -------------------------------------------------------------------------- */
2865
2866 static StgTSO *
2867 threadStackOverflow(StgTSO *tso)
2868 {
2869   nat new_stack_size, stack_words;
2870   lnat new_tso_size;
2871   StgPtr new_sp;
2872   StgTSO *dest;
2873
2874   IF_DEBUG(sanity,checkTSO(tso));
2875   if (tso->stack_size >= tso->max_stack_size) {
2876
2877     IF_DEBUG(gc,
2878              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2879                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2880              /* If we're debugging, just print out the top of the stack */
2881              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2882                                               tso->sp+64)));
2883
2884     /* Send this thread the StackOverflow exception */
2885     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2886     return tso;
2887   }
2888
2889   /* Try to double the current stack size.  If that takes us over the
2890    * maximum stack size for this thread, then use the maximum instead.
2891    * Finally round up so the TSO ends up as a whole number of blocks.
2892    */
2893   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2894   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2895                                        TSO_STRUCT_SIZE)/sizeof(W_);
2896   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2897   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2898
2899   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2900
2901   dest = (StgTSO *)allocate(new_tso_size);
2902   TICK_ALLOC_TSO(new_stack_size,0);
2903
2904   /* copy the TSO block and the old stack into the new area */
2905   memcpy(dest,tso,TSO_STRUCT_SIZE);
2906   stack_words = tso->stack + tso->stack_size - tso->sp;
2907   new_sp = (P_)dest + new_tso_size - stack_words;
2908   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2909
2910   /* relocate the stack pointers... */
2911   dest->sp         = new_sp;
2912   dest->stack_size = new_stack_size;
2913         
2914   /* Mark the old TSO as relocated.  We have to check for relocated
2915    * TSOs in the garbage collector and any primops that deal with TSOs.
2916    *
2917    * It's important to set the sp value to just beyond the end
2918    * of the stack, so we don't attempt to scavenge any part of the
2919    * dead TSO's stack.
2920    */
2921   tso->what_next = ThreadRelocated;
2922   tso->link = dest;
2923   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2924   tso->why_blocked = NotBlocked;
2925
2926   IF_PAR_DEBUG(verbose,
2927                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2928                      tso->id, tso, tso->stack_size);
2929                /* If we're debugging, just print out the top of the stack */
2930                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2931                                                 tso->sp+64)));
2932   
2933   IF_DEBUG(sanity,checkTSO(tso));
2934 #if 0
2935   IF_DEBUG(scheduler,printTSO(dest));
2936 #endif
2937
2938   return dest;
2939 }
2940
2941 /* ---------------------------------------------------------------------------
2942    Wake up a queue that was blocked on some resource.
2943    ------------------------------------------------------------------------ */
2944
2945 #if defined(GRAN)
2946 STATIC_INLINE void
2947 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2948 {
2949 }
2950 #elif defined(PARALLEL_HASKELL)
2951 STATIC_INLINE void
2952 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2953 {
2954   /* write RESUME events to log file and
2955      update blocked and fetch time (depending on type of the orig closure) */
2956   if (RtsFlags.ParFlags.ParStats.Full) {
2957     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2958                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2959                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2960     if (EMPTY_RUN_QUEUE())
2961       emitSchedule = rtsTrue;
2962
2963     switch (get_itbl(node)->type) {
2964         case FETCH_ME_BQ:
2965           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2966           break;
2967         case RBH:
2968         case FETCH_ME:
2969         case BLACKHOLE_BQ:
2970           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2971           break;
2972 #ifdef DIST
2973         case MVAR:
2974           break;
2975 #endif    
2976         default:
2977           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2978         }
2979       }
2980 }
2981 #endif
2982
2983 #if defined(GRAN)
2984 StgBlockingQueueElement *
2985 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2986 {
2987     StgTSO *tso;
2988     PEs node_loc, tso_loc;
2989
2990     node_loc = where_is(node); // should be lifted out of loop
2991     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2992     tso_loc = where_is((StgClosure *)tso);
2993     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2994       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2995       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2996       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2997       // insertThread(tso, node_loc);
2998       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2999                 ResumeThread,
3000                 tso, node, (rtsSpark*)NULL);
3001       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3002       // len_local++;
3003       // len++;
3004     } else { // TSO is remote (actually should be FMBQ)
3005       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3006                                   RtsFlags.GranFlags.Costs.gunblocktime +
3007                                   RtsFlags.GranFlags.Costs.latency;
3008       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3009                 UnblockThread,
3010                 tso, node, (rtsSpark*)NULL);
3011       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3012       // len++;
3013     }
3014     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3015     IF_GRAN_DEBUG(bq,
3016                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3017                           (node_loc==tso_loc ? "Local" : "Global"), 
3018                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3019     tso->block_info.closure = NULL;
3020     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3021                              tso->id, tso));
3022 }
3023 #elif defined(PARALLEL_HASKELL)
3024 StgBlockingQueueElement *
3025 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
3026 {
3027     StgBlockingQueueElement *next;
3028
3029     switch (get_itbl(bqe)->type) {
3030     case TSO:
3031       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3032       /* if it's a TSO just push it onto the run_queue */
3033       next = bqe->link;
3034       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3035       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3036       threadRunnable();
3037       unblockCount(bqe, node);
3038       /* reset blocking status after dumping event */
3039       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3040       break;
3041
3042     case BLOCKED_FETCH:
3043       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3044       next = bqe->link;
3045       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3046       PendingFetches = (StgBlockedFetch *)bqe;
3047       break;
3048
3049 # if defined(DEBUG)
3050       /* can ignore this case in a non-debugging setup; 
3051          see comments on RBHSave closures above */
3052     case CONSTR:
3053       /* check that the closure is an RBHSave closure */
3054       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3055              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3056              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3057       break;
3058
3059     default:
3060       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3061            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3062            (StgClosure *)bqe);
3063 # endif
3064     }
3065   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3066   return next;
3067 }
3068
3069 #else /* !GRAN && !PARALLEL_HASKELL */
3070 StgTSO *
3071 unblockOneLocked(StgTSO *tso)
3072 {
3073   StgTSO *next;
3074
3075   ASSERT(get_itbl(tso)->type == TSO);
3076   ASSERT(tso->why_blocked != NotBlocked);
3077   tso->why_blocked = NotBlocked;
3078   next = tso->link;
3079   tso->link = END_TSO_QUEUE;
3080   APPEND_TO_RUN_QUEUE(tso);
3081   threadRunnable();
3082   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3083   return next;
3084 }
3085 #endif
3086
3087 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3088 INLINE_ME StgBlockingQueueElement *
3089 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3090 {
3091   ACQUIRE_LOCK(&sched_mutex);
3092   bqe = unblockOneLocked(bqe, node);
3093   RELEASE_LOCK(&sched_mutex);
3094   return bqe;
3095 }
3096 #else
3097 INLINE_ME StgTSO *
3098 unblockOne(StgTSO *tso)
3099 {
3100   ACQUIRE_LOCK(&sched_mutex);
3101   tso = unblockOneLocked(tso);
3102   RELEASE_LOCK(&sched_mutex);
3103   return tso;
3104 }
3105 #endif
3106
3107 #if defined(GRAN)
3108 void 
3109 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3110 {
3111   StgBlockingQueueElement *bqe;
3112   PEs node_loc;
3113   nat len = 0; 
3114
3115   IF_GRAN_DEBUG(bq, 
3116                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3117                       node, CurrentProc, CurrentTime[CurrentProc], 
3118                       CurrentTSO->id, CurrentTSO));
3119
3120   node_loc = where_is(node);
3121
3122   ASSERT(q == END_BQ_QUEUE ||
3123          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3124          get_itbl(q)->type == CONSTR); // closure (type constructor)
3125   ASSERT(is_unique(node));
3126
3127   /* FAKE FETCH: magically copy the node to the tso's proc;
3128      no Fetch necessary because in reality the node should not have been 
3129      moved to the other PE in the first place
3130   */
3131   if (CurrentProc!=node_loc) {
3132     IF_GRAN_DEBUG(bq, 
3133                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3134                         node, node_loc, CurrentProc, CurrentTSO->id, 
3135                         // CurrentTSO, where_is(CurrentTSO),
3136                         node->header.gran.procs));
3137     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3138     IF_GRAN_DEBUG(bq, 
3139                   debugBelch("## new bitmask of node %p is %#x\n",
3140                         node, node->header.gran.procs));
3141     if (RtsFlags.GranFlags.GranSimStats.Global) {
3142       globalGranStats.tot_fake_fetches++;
3143     }
3144   }
3145
3146   bqe = q;
3147   // ToDo: check: ASSERT(CurrentProc==node_loc);
3148   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3149     //next = bqe->link;
3150     /* 
3151        bqe points to the current element in the queue
3152        next points to the next element in the queue
3153     */
3154     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3155     //tso_loc = where_is(tso);
3156     len++;
3157     bqe = unblockOneLocked(bqe, node);
3158   }
3159
3160   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3161      the closure to make room for the anchor of the BQ */
3162   if (bqe!=END_BQ_QUEUE) {
3163     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3164     /*
3165     ASSERT((info_ptr==&RBH_Save_0_info) ||
3166            (info_ptr==&RBH_Save_1_info) ||
3167            (info_ptr==&RBH_Save_2_info));
3168     */
3169     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3170     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3171     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3172
3173     IF_GRAN_DEBUG(bq,
3174                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3175                         node, info_type(node)));
3176   }
3177
3178   /* statistics gathering */
3179   if (RtsFlags.GranFlags.GranSimStats.Global) {
3180     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3181     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3182     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3183     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3184   }
3185   IF_GRAN_DEBUG(bq,
3186                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3187                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3188 }
3189 #elif defined(PARALLEL_HASKELL)
3190 void 
3191 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3192 {
3193   StgBlockingQueueElement *bqe;
3194
3195   ACQUIRE_LOCK(&sched_mutex);
3196
3197   IF_PAR_DEBUG(verbose, 
3198                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3199                      node, mytid));
3200 #ifdef DIST  
3201   //RFP
3202   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3203     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3204     return;
3205   }
3206 #endif
3207   
3208   ASSERT(q == END_BQ_QUEUE ||
3209          get_itbl(q)->type == TSO ||           
3210          get_itbl(q)->type == BLOCKED_FETCH || 
3211          get_itbl(q)->type == CONSTR); 
3212
3213   bqe = q;
3214   while (get_itbl(bqe)->type==TSO || 
3215          get_itbl(bqe)->type==BLOCKED_FETCH) {
3216     bqe = unblockOneLocked(bqe, node);
3217   }
3218   RELEASE_LOCK(&sched_mutex);
3219 }
3220
3221 #else   /* !GRAN && !PARALLEL_HASKELL */
3222
3223 void
3224 awakenBlockedQueueNoLock(StgTSO *tso)
3225 {
3226   while (tso != END_TSO_QUEUE) {
3227     tso = unblockOneLocked(tso);
3228   }
3229 }
3230
3231 void
3232 awakenBlockedQueue(StgTSO *tso)
3233 {
3234   ACQUIRE_LOCK(&sched_mutex);
3235   while (tso != END_TSO_QUEUE) {
3236     tso = unblockOneLocked(tso);
3237   }
3238   RELEASE_LOCK(&sched_mutex);
3239 }
3240 #endif
3241
3242 /* ---------------------------------------------------------------------------
3243    Interrupt execution
3244    - usually called inside a signal handler so it mustn't do anything fancy.   
3245    ------------------------------------------------------------------------ */
3246
3247 void
3248 interruptStgRts(void)
3249 {
3250     interrupted    = 1;
3251     context_switch = 1;
3252     threadRunnable();
3253     /* ToDo: if invoked from a signal handler, this threadRunnable
3254      * only works if there's another thread (not this one) waiting to
3255      * be woken up.
3256      */
3257 }
3258
3259 /* -----------------------------------------------------------------------------
3260    Unblock a thread
3261
3262    This is for use when we raise an exception in another thread, which
3263    may be blocked.
3264    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3265    -------------------------------------------------------------------------- */
3266
3267 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3268 /*
3269   NB: only the type of the blocking queue is different in GranSim and GUM
3270       the operations on the queue-elements are the same
3271       long live polymorphism!
3272
3273   Locks: sched_mutex is held upon entry and exit.
3274
3275 */
3276 static void
3277 unblockThread(StgTSO *tso)
3278 {
3279   StgBlockingQueueElement *t, **last;
3280
3281   switch (tso->why_blocked) {
3282
3283   case NotBlocked:
3284     return;  /* not blocked */
3285
3286   case BlockedOnSTM:
3287     // Be careful: nothing to do here!  We tell the scheduler that the thread
3288     // is runnable and we leave it to the stack-walking code to abort the 
3289     // transaction while unwinding the stack.  We should perhaps have a debugging
3290     // test to make sure that this really happens and that the 'zombie' transaction
3291     // does not get committed.
3292     goto done;
3293
3294   case BlockedOnMVar:
3295     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3296     {
3297       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3298       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3299
3300       last = (StgBlockingQueueElement **)&mvar->head;
3301       for (t = (StgBlockingQueueElement *)mvar->head; 
3302            t != END_BQ_QUEUE; 
3303            last = &t->link, last_tso = t, t = t->link) {
3304         if (t == (StgBlockingQueueElement *)tso) {
3305           *last = (StgBlockingQueueElement *)tso->link;
3306           if (mvar->tail == tso) {
3307             mvar->tail = (StgTSO *)last_tso;
3308           }
3309           goto done;
3310         }
3311       }
3312       barf("unblockThread (MVAR): TSO not found");
3313     }
3314
3315   case BlockedOnBlackHole:
3316     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3317     {
3318       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3319
3320       last = &bq->blocking_queue;
3321       for (t = bq->blocking_queue; 
3322            t != END_BQ_QUEUE; 
3323            last = &t->link, t = t->link) {
3324         if (t == (StgBlockingQueueElement *)tso) {
3325           *last = (StgBlockingQueueElement *)tso->link;
3326           goto done;
3327         }
3328       }
3329       barf("unblockThread (BLACKHOLE): TSO not found");
3330     }
3331
3332   case BlockedOnException:
3333     {
3334       StgTSO *target  = tso->block_info.tso;
3335
3336       ASSERT(get_itbl(target)->type == TSO);
3337
3338       if (target->what_next == ThreadRelocated) {
3339           target = target->link;
3340           ASSERT(get_itbl(target)->type == TSO);
3341       }
3342
3343       ASSERT(target->blocked_exceptions != NULL);
3344
3345       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3346       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3347            t != END_BQ_QUEUE; 
3348            last = &t->link, t = t->link) {
3349         ASSERT(get_itbl(t)->type == TSO);
3350         if (t == (StgBlockingQueueElement *)tso) {
3351           *last = (StgBlockingQueueElement *)tso->link;
3352           goto done;
3353         }
3354       }
3355       barf("unblockThread (Exception): TSO not found");
3356     }
3357
3358   case BlockedOnRead:
3359   case BlockedOnWrite:
3360 #if defined(mingw32_HOST_OS)
3361   case BlockedOnDoProc:
3362 #endif
3363     {
3364       /* take TSO off blocked_queue */
3365       StgBlockingQueueElement *prev = NULL;
3366       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3367            prev = t, t = t->link) {
3368         if (t == (StgBlockingQueueElement *)tso) {
3369           if (prev == NULL) {
3370             blocked_queue_hd = (StgTSO *)t->link;
3371             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3372               blocked_queue_tl = END_TSO_QUEUE;
3373             }
3374           } else {
3375             prev->link = t->link;
3376             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3377               blocked_queue_tl = (StgTSO *)prev;
3378             }
3379           }
3380 #if defined(mingw32_HOST_OS)
3381           /* (Cooperatively) signal that the worker thread should abort
3382            * the request.
3383            */
3384           abandonWorkRequest(tso->block_info.async_result->reqID);
3385 #endif
3386           goto done;
3387         }
3388       }
3389       barf("unblockThread (I/O): TSO not found");
3390     }
3391
3392   case BlockedOnDelay:
3393     {
3394       /* take TSO off sleeping_queue */
3395       StgBlockingQueueElement *prev = NULL;
3396       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3397            prev = t, t = t->link) {
3398         if (t == (StgBlockingQueueElement *)tso) {
3399           if (prev == NULL) {
3400             sleeping_queue = (StgTSO *)t->link;
3401           } else {
3402             prev->link = t->link;
3403           }
3404           goto done;
3405         }
3406       }
3407       barf("unblockThread (delay): TSO not found");
3408     }
3409
3410   default:
3411     barf("unblockThread");
3412   }
3413
3414  done:
3415   tso->link = END_TSO_QUEUE;
3416   tso->why_blocked = NotBlocked;
3417   tso->block_info.closure = NULL;
3418   PUSH_ON_RUN_QUEUE(tso);
3419 }
3420 #else
3421 static void
3422 unblockThread(StgTSO *tso)
3423 {
3424   StgTSO *t, **last;
3425   
3426   /* To avoid locking unnecessarily. */
3427   if (tso->why_blocked == NotBlocked) {
3428     return;
3429   }
3430
3431   switch (tso->why_blocked) {
3432
3433   case BlockedOnSTM:
3434     // Be careful: nothing to do here!  We tell the scheduler that the thread
3435     // is runnable and we leave it to the stack-walking code to abort the 
3436     // transaction while unwinding the stack.  We should perhaps have a debugging
3437     // test to make sure that this really happens and that the 'zombie' transaction
3438     // does not get committed.
3439     goto done;
3440
3441   case BlockedOnMVar:
3442     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3443     {
3444       StgTSO *last_tso = END_TSO_QUEUE;
3445       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3446
3447       last = &mvar->head;
3448       for (t = mvar->head; t != END_TSO_QUEUE; 
3449            last = &t->link, last_tso = t, t = t->link) {
3450         if (t == tso) {
3451           *last = tso->link;
3452           if (mvar->tail == tso) {
3453             mvar->tail = last_tso;
3454           }
3455           goto done;
3456         }
3457       }
3458       barf("unblockThread (MVAR): TSO not found");
3459     }
3460
3461   case BlockedOnBlackHole:
3462     {
3463       last = &blackhole_queue;
3464       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3465            last = &t->link, t = t->link) {
3466         if (t == tso) {
3467           *last = tso->link;
3468           goto done;
3469         }
3470       }
3471       barf("unblockThread (BLACKHOLE): TSO not found");
3472     }
3473
3474   case BlockedOnException:
3475     {
3476       StgTSO *target  = tso->block_info.tso;
3477
3478       ASSERT(get_itbl(target)->type == TSO);
3479
3480       while (target->what_next == ThreadRelocated) {
3481           target = target->link;
3482           ASSERT(get_itbl(target)->type == TSO);
3483       }
3484       
3485       ASSERT(target->blocked_exceptions != NULL);
3486
3487       last = &target->blocked_exceptions;
3488       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3489            last = &t->link, t = t->link) {
3490         ASSERT(get_itbl(t)->type == TSO);
3491         if (t == tso) {
3492           *last = tso->link;
3493           goto done;
3494         }
3495       }
3496       barf("unblockThread (Exception): TSO not found");
3497     }
3498
3499   case BlockedOnRead:
3500   case BlockedOnWrite:
3501 #if defined(mingw32_HOST_OS)
3502   case BlockedOnDoProc:
3503 #endif
3504     {
3505       StgTSO *prev = NULL;
3506       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3507            prev = t, t = t->link) {
3508         if (t == tso) {
3509           if (prev == NULL) {
3510             blocked_queue_hd = t->link;
3511             if (blocked_queue_tl == t) {
3512               blocked_queue_tl = END_TSO_QUEUE;
3513             }
3514           } else {
3515             prev->link = t->link;
3516             if (blocked_queue_tl == t) {
3517               blocked_queue_tl = prev;
3518             }
3519           }
3520 #if defined(mingw32_HOST_OS)
3521           /* (Cooperatively) signal that the worker thread should abort
3522            * the request.
3523            */
3524           abandonWorkRequest(tso->block_info.async_result->reqID);
3525 #endif
3526           goto done;
3527         }
3528       }
3529       barf("unblockThread (I/O): TSO not found");
3530     }
3531
3532   case BlockedOnDelay:
3533     {
3534       StgTSO *prev = NULL;
3535       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3536            prev = t, t = t->link) {
3537         if (t == tso) {
3538           if (prev == NULL) {
3539             sleeping_queue = t->link;
3540           } else {
3541             prev->link = t->link;
3542           }
3543           goto done;
3544         }
3545       }
3546       barf("unblockThread (delay): TSO not found");
3547     }
3548
3549   default:
3550     barf("unblockThread");
3551   }
3552
3553  done:
3554   tso->link = END_TSO_QUEUE;
3555   tso->why_blocked = NotBlocked;
3556   tso->block_info.closure = NULL;
3557   APPEND_TO_RUN_QUEUE(tso);
3558 }
3559 #endif
3560
3561 /* -----------------------------------------------------------------------------
3562  * checkBlackHoles()
3563  *
3564  * Check the blackhole_queue for threads that can be woken up.  We do
3565  * this periodically: before every GC, and whenever the run queue is
3566  * empty.
3567  *
3568  * An elegant solution might be to just wake up all the blocked
3569  * threads with awakenBlockedQueue occasionally: they'll go back to
3570  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3571  * doesn't give us a way to tell whether we've actually managed to
3572  * wake up any threads, so we would be busy-waiting.
3573  *
3574  * -------------------------------------------------------------------------- */
3575
3576 static rtsBool
3577 checkBlackHoles( void )
3578 {
3579     StgTSO **prev, *t;
3580     rtsBool any_woke_up = rtsFalse;
3581     StgHalfWord type;
3582
3583     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3584
3585     // ASSUMES: sched_mutex
3586     prev = &blackhole_queue;
3587     t = blackhole_queue;
3588     while (t != END_TSO_QUEUE) {
3589         ASSERT(t->why_blocked == BlockedOnBlackHole);
3590         type = get_itbl(t->block_info.closure)->type;
3591         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3592             t = unblockOneLocked(t);
3593             *prev = t;
3594             any_woke_up = rtsTrue;
3595         } else {
3596             prev = &t->link;
3597             t = t->link;
3598         }
3599     }
3600
3601     return any_woke_up;
3602 }
3603
3604 /* -----------------------------------------------------------------------------
3605  * raiseAsync()
3606  *
3607  * The following function implements the magic for raising an
3608  * asynchronous exception in an existing thread.
3609  *
3610  * We first remove the thread from any queue on which it might be
3611  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3612  *
3613  * We strip the stack down to the innermost CATCH_FRAME, building
3614  * thunks in the heap for all the active computations, so they can 
3615  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3616  * an application of the handler to the exception, and push it on
3617  * the top of the stack.
3618  * 
3619  * How exactly do we save all the active computations?  We create an
3620  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3621  * AP_STACKs pushes everything from the corresponding update frame
3622  * upwards onto the stack.  (Actually, it pushes everything up to the
3623  * next update frame plus a pointer to the next AP_STACK object.
3624  * Entering the next AP_STACK object pushes more onto the stack until we
3625  * reach the last AP_STACK object - at which point the stack should look
3626  * exactly as it did when we killed the TSO and we can continue
3627  * execution by entering the closure on top of the stack.
3628  *
3629  * We can also kill a thread entirely - this happens if either (a) the 
3630  * exception passed to raiseAsync is NULL, or (b) there's no
3631  * CATCH_FRAME on the stack.  In either case, we strip the entire
3632  * stack and replace the thread with a zombie.
3633  *
3634  * Locks: sched_mutex held upon entry nor exit.
3635  *
3636  * -------------------------------------------------------------------------- */
3637  
3638 void 
3639 deleteThread(StgTSO *tso)
3640 {
3641   if (tso->why_blocked != BlockedOnCCall &&
3642       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3643       raiseAsync(tso,NULL);
3644   }
3645 }
3646
3647 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3648 static void 
3649 deleteThreadImmediately(StgTSO *tso)
3650 { // for forkProcess only:
3651   // delete thread without giving it a chance to catch the KillThread exception
3652
3653   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3654       return;
3655   }
3656
3657   if (tso->why_blocked != BlockedOnCCall &&
3658       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3659     unblockThread(tso);
3660   }
3661
3662   tso->what_next = ThreadKilled;
3663 }
3664 #endif
3665
3666 void
3667 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3668 {
3669   /* When raising async exs from contexts where sched_mutex isn't held;
3670      use raiseAsyncWithLock(). */
3671   ACQUIRE_LOCK(&sched_mutex);
3672   raiseAsync(tso,exception);
3673   RELEASE_LOCK(&sched_mutex);
3674 }
3675
3676 void
3677 raiseAsync(StgTSO *tso, StgClosure *exception)
3678 {
3679     raiseAsync_(tso, exception, rtsFalse);
3680 }
3681
3682 static void
3683 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3684 {
3685     StgRetInfoTable *info;
3686     StgPtr sp;
3687   
3688     // Thread already dead?
3689     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3690         return;
3691     }
3692
3693     IF_DEBUG(scheduler, 
3694              sched_belch("raising exception in thread %ld.", (long)tso->id));
3695     
3696     // Remove it from any blocking queues
3697     unblockThread(tso);
3698
3699     sp = tso->sp;
3700     
3701     // The stack freezing code assumes there's a closure pointer on
3702     // the top of the stack, so we have to arrange that this is the case...
3703     //
3704     if (sp[0] == (W_)&stg_enter_info) {
3705         sp++;
3706     } else {
3707         sp--;
3708         sp[0] = (W_)&stg_dummy_ret_closure;
3709     }
3710
3711     while (1) {
3712         nat i;
3713
3714         // 1. Let the top of the stack be the "current closure"
3715         //
3716         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3717         // CATCH_FRAME.
3718         //
3719         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3720         // current closure applied to the chunk of stack up to (but not
3721         // including) the update frame.  This closure becomes the "current
3722         // closure".  Go back to step 2.
3723         //
3724         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3725         // top of the stack applied to the exception.
3726         // 
3727         // 5. If it's a STOP_FRAME, then kill the thread.
3728         // 
3729         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3730         // transaction
3731        
3732         
3733         StgPtr frame;
3734         
3735         frame = sp + 1;
3736         info = get_ret_itbl((StgClosure *)frame);
3737         
3738         while (info->i.type != UPDATE_FRAME
3739                && (info->i.type != CATCH_FRAME || exception == NULL)
3740                && info->i.type != STOP_FRAME
3741                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3742         {
3743             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3744               // IF we find an ATOMICALLY_FRAME then we abort the
3745               // current transaction and propagate the exception.  In
3746               // this case (unlike ordinary exceptions) we do not care
3747               // whether the transaction is valid or not because its
3748               // possible validity cannot have caused the exception
3749               // and will not be visible after the abort.
3750               IF_DEBUG(stm,
3751                        debugBelch("Found atomically block delivering async exception\n"));
3752               stmAbortTransaction(tso -> trec);
3753               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3754             }
3755             frame += stack_frame_sizeW((StgClosure *)frame);
3756             info = get_ret_itbl((StgClosure *)frame);
3757         }
3758         
3759         switch (info->i.type) {
3760             
3761         case ATOMICALLY_FRAME:
3762             ASSERT(stop_at_atomically);
3763             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3764             stmCondemnTransaction(tso -> trec);
3765 #ifdef REG_R1
3766             tso->sp = frame;
3767 #else
3768             // R1 is not a register: the return convention for IO in
3769             // this case puts the return value on the stack, so we
3770             // need to set up the stack to return to the atomically
3771             // frame properly...
3772             tso->sp = frame - 2;
3773             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3774             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3775 #endif
3776             tso->what_next = ThreadRunGHC;
3777             return;
3778
3779         case CATCH_FRAME:
3780             // If we find a CATCH_FRAME, and we've got an exception to raise,
3781             // then build the THUNK raise(exception), and leave it on
3782             // top of the CATCH_FRAME ready to enter.
3783             //
3784         {
3785 #ifdef PROFILING
3786             StgCatchFrame *cf = (StgCatchFrame *)frame;
3787 #endif
3788             StgThunk *raise;
3789             
3790             // we've got an exception to raise, so let's pass it to the
3791             // handler in this frame.
3792             //
3793             raise = (StgThunk *)allocate(sizeofW(StgThunk)+1);
3794             TICK_ALLOC_SE_THK(1,0);
3795             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3796             raise->payload[0] = exception;
3797             
3798             // throw away the stack from Sp up to the CATCH_FRAME.
3799             //
3800             sp = frame - 1;
3801             
3802             /* Ensure that async excpetions are blocked now, so we don't get
3803              * a surprise exception before we get around to executing the
3804              * handler.
3805              */
3806             if (tso->blocked_exceptions == NULL) {
3807                 tso->blocked_exceptions = END_TSO_QUEUE;
3808             }
3809             
3810             /* Put the newly-built THUNK on top of the stack, ready to execute
3811              * when the thread restarts.
3812              */
3813             sp[0] = (W_)raise;
3814             sp[-1] = (W_)&stg_enter_info;
3815             tso->sp = sp-1;
3816             tso->what_next = ThreadRunGHC;
3817             IF_DEBUG(sanity, checkTSO(tso));
3818             return;
3819         }
3820         
3821         case UPDATE_FRAME:
3822         {
3823             StgAP_STACK * ap;
3824             nat words;
3825             
3826             // First build an AP_STACK consisting of the stack chunk above the
3827             // current update frame, with the top word on the stack as the
3828             // fun field.
3829             //
3830             words = frame - sp - 1;
3831             ap = (StgAP_STACK *)allocate(AP_STACK_sizeW(words));
3832             
3833             ap->size = words;
3834             ap->fun  = (StgClosure *)sp[0];
3835             sp++;
3836             for(i=0; i < (nat)words; ++i) {
3837                 ap->payload[i] = (StgClosure *)*sp++;
3838             }
3839             
3840             SET_HDR(ap,&stg_AP_STACK_info,
3841                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3842             TICK_ALLOC_UP_THK(words+1,0);
3843             
3844             IF_DEBUG(scheduler,
3845                      debugBelch("sched: Updating ");
3846                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3847                      debugBelch(" with ");
3848                      printObj((StgClosure *)ap);
3849                 );
3850
3851             // Replace the updatee with an indirection - happily
3852             // this will also wake up any threads currently
3853             // waiting on the result.
3854             //
3855             // Warning: if we're in a loop, more than one update frame on
3856             // the stack may point to the same object.  Be careful not to
3857             // overwrite an IND_OLDGEN in this case, because we'll screw
3858             // up the mutable lists.  To be on the safe side, don't
3859             // overwrite any kind of indirection at all.  See also
3860             // threadSqueezeStack in GC.c, where we have to make a similar
3861             // check.
3862             //
3863             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3864                 // revert the black hole
3865                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3866                                (StgClosure *)ap);
3867             }
3868             sp += sizeofW(StgUpdateFrame) - 1;
3869             sp[0] = (W_)ap; // push onto stack
3870             break;
3871         }
3872         
3873         case STOP_FRAME:
3874             // We've stripped the entire stack, the thread is now dead.
3875             sp += sizeofW(StgStopFrame);
3876             tso->what_next = ThreadKilled;
3877             tso->sp = sp;
3878             return;
3879             
3880         default:
3881             barf("raiseAsync");
3882         }
3883     }
3884     barf("raiseAsync");
3885 }
3886
3887 /* -----------------------------------------------------------------------------
3888    raiseExceptionHelper
3889    
3890    This function is called by the raise# primitve, just so that we can
3891    move some of the tricky bits of raising an exception from C-- into
3892    C.  Who knows, it might be a useful re-useable thing here too.
3893    -------------------------------------------------------------------------- */
3894
3895 StgWord
3896 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3897 {
3898     StgThunk *raise_closure = NULL;
3899     StgPtr p, next;
3900     StgRetInfoTable *info;
3901     //
3902     // This closure represents the expression 'raise# E' where E
3903     // is the exception raise.  It is used to overwrite all the
3904     // thunks which are currently under evaluataion.
3905     //
3906
3907     //    
3908     // LDV profiling: stg_raise_info has THUNK as its closure
3909     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3910     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3911     // 1 does not cause any problem unless profiling is performed.
3912     // However, when LDV profiling goes on, we need to linearly scan
3913     // small object pool, where raise_closure is stored, so we should
3914     // use MIN_UPD_SIZE.
3915     //
3916     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3917     //                                 sizeofW(StgClosure)+1);
3918     //
3919
3920     //
3921     // Walk up the stack, looking for the catch frame.  On the way,
3922     // we update any closures pointed to from update frames with the
3923     // raise closure that we just built.
3924     //
3925     p = tso->sp;
3926     while(1) {
3927         info = get_ret_itbl((StgClosure *)p);
3928         next = p + stack_frame_sizeW((StgClosure *)p);
3929         switch (info->i.type) {
3930             
3931         case UPDATE_FRAME:
3932             // Only create raise_closure if we need to.
3933             if (raise_closure == NULL) {
3934                 raise_closure = 
3935                     (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE);
3936                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3937                 raise_closure->payload[0] = exception;
3938             }
3939             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3940             p = next;
3941             continue;
3942
3943         case ATOMICALLY_FRAME:
3944             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3945             tso->sp = p;
3946             return ATOMICALLY_FRAME;
3947             
3948         case CATCH_FRAME:
3949             tso->sp = p;
3950             return CATCH_FRAME;
3951
3952         case CATCH_STM_FRAME:
3953             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3954             tso->sp = p;
3955             return CATCH_STM_FRAME;
3956             
3957         case STOP_FRAME:
3958             tso->sp = p;
3959             return STOP_FRAME;
3960
3961         case CATCH_RETRY_FRAME:
3962         default:
3963             p = next; 
3964             continue;
3965         }
3966     }
3967 }
3968
3969
3970 /* -----------------------------------------------------------------------------
3971    findRetryFrameHelper
3972
3973    This function is called by the retry# primitive.  It traverses the stack
3974    leaving tso->sp referring to the frame which should handle the retry.  
3975
3976    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3977    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3978
3979    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3980    despite the similar implementation.
3981
3982    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3983    not be created within memory transactions.
3984    -------------------------------------------------------------------------- */
3985
3986 StgWord
3987 findRetryFrameHelper (StgTSO *tso)
3988 {
3989   StgPtr           p, next;
3990   StgRetInfoTable *info;
3991
3992   p = tso -> sp;
3993   while (1) {
3994     info = get_ret_itbl((StgClosure *)p);
3995     next = p + stack_frame_sizeW((StgClosure *)p);
3996     switch (info->i.type) {
3997       
3998     case ATOMICALLY_FRAME:
3999       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4000       tso->sp = p;
4001       return ATOMICALLY_FRAME;
4002       
4003     case CATCH_RETRY_FRAME:
4004       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4005       tso->sp = p;
4006       return CATCH_RETRY_FRAME;
4007       
4008     case CATCH_STM_FRAME:
4009     default:
4010       ASSERT(info->i.type != CATCH_FRAME);
4011       ASSERT(info->i.type != STOP_FRAME);
4012       p = next; 
4013       continue;
4014     }
4015   }
4016 }
4017
4018 /* -----------------------------------------------------------------------------
4019    resurrectThreads is called after garbage collection on the list of
4020    threads found to be garbage.  Each of these threads will be woken
4021    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4022    on an MVar, or NonTermination if the thread was blocked on a Black
4023    Hole.
4024
4025    Locks: sched_mutex isn't held upon entry nor exit.
4026    -------------------------------------------------------------------------- */
4027
4028 void
4029 resurrectThreads( StgTSO *threads )
4030 {
4031   StgTSO *tso, *next;
4032
4033   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4034     next = tso->global_link;
4035     tso->global_link = all_threads;
4036     all_threads = tso;
4037     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4038
4039     switch (tso->why_blocked) {
4040     case BlockedOnMVar:
4041     case BlockedOnException:
4042       /* Called by GC - sched_mutex lock is currently held. */
4043       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
4044       break;
4045     case BlockedOnBlackHole:
4046       raiseAsync(tso,(StgClosure *)NonTermination_closure);
4047       break;
4048     case BlockedOnSTM:
4049       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
4050       break;
4051     case NotBlocked:
4052       /* This might happen if the thread was blocked on a black hole
4053        * belonging to a thread that we've just woken up (raiseAsync
4054        * can wake up threads, remember...).
4055        */
4056       continue;
4057     default:
4058       barf("resurrectThreads: thread blocked in a strange way");
4059     }
4060   }
4061 }
4062
4063 /* ----------------------------------------------------------------------------
4064  * Debugging: why is a thread blocked
4065  * [Also provides useful information when debugging threaded programs
4066  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4067    ------------------------------------------------------------------------- */
4068
4069 static void
4070 printThreadBlockage(StgTSO *tso)
4071 {
4072   switch (tso->why_blocked) {
4073   case BlockedOnRead:
4074     debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
4075     break;
4076   case BlockedOnWrite:
4077     debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
4078     break;
4079 #if defined(mingw32_HOST_OS)
4080     case BlockedOnDoProc:
4081     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4082     break;
4083 #endif
4084   case BlockedOnDelay:
4085     debugBelch("is blocked until %ld", tso->block_info.target);
4086     break;
4087   case BlockedOnMVar:
4088     debugBelch("is blocked on an MVar");
4089     break;
4090   case BlockedOnException:
4091     debugBelch("is blocked on delivering an exception to thread %d",
4092             tso->block_info.tso->id);
4093     break;
4094   case BlockedOnBlackHole:
4095     debugBelch("is blocked on a black hole");
4096     break;
4097   case NotBlocked:
4098     debugBelch("is not blocked");
4099     break;
4100 #if defined(PARALLEL_HASKELL)
4101   case BlockedOnGA:
4102     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4103             tso->block_info.closure, info_type(tso->block_info.closure));
4104     break;
4105   case BlockedOnGA_NoSend:
4106     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4107             tso->block_info.closure, info_type(tso->block_info.closure));
4108     break;
4109 #endif
4110   case BlockedOnCCall:
4111     debugBelch("is blocked on an external call");
4112     break;
4113   case BlockedOnCCall_NoUnblockExc:
4114     debugBelch("is blocked on an external call (exceptions were already blocked)");
4115     break;
4116   case BlockedOnSTM:
4117     debugBelch("is blocked on an STM operation");
4118     break;
4119   default:
4120     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4121          tso->why_blocked, tso->id, tso);
4122   }
4123 }
4124
4125 static void
4126 printThreadStatus(StgTSO *tso)
4127 {
4128   switch (tso->what_next) {
4129   case ThreadKilled:
4130     debugBelch("has been killed");
4131     break;
4132   case ThreadComplete:
4133     debugBelch("has completed");
4134     break;
4135   default:
4136     printThreadBlockage(tso);
4137   }
4138 }
4139
4140 void
4141 printAllThreads(void)
4142 {
4143   StgTSO *t;
4144
4145 # if defined(GRAN)
4146   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4147   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4148                        time_string, rtsFalse/*no commas!*/);
4149
4150   debugBelch("all threads at [%s]:\n", time_string);
4151 # elif defined(PARALLEL_HASKELL)
4152   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4153   ullong_format_string(CURRENT_TIME,
4154                        time_string, rtsFalse/*no commas!*/);
4155
4156   debugBelch("all threads at [%s]:\n", time_string);
4157 # else
4158   debugBelch("all threads:\n");
4159 # endif
4160
4161   for (t = all_threads; t != END_TSO_QUEUE; ) {
4162     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
4163 #if defined(DEBUG)
4164     {
4165       void *label = lookupThreadLabel(t->id);
4166       if (label) debugBelch("[\"%s\"] ",(char *)label);
4167     }
4168 #endif
4169     if (t->what_next == ThreadRelocated) {
4170         debugBelch("has been relocated...\n");
4171         t = t->link;
4172     } else {
4173         printThreadStatus(t);
4174         debugBelch("\n");
4175         t = t->global_link;
4176     }
4177   }
4178 }
4179     
4180 #ifdef DEBUG
4181
4182 /* 
4183    Print a whole blocking queue attached to node (debugging only).
4184 */
4185 # if defined(PARALLEL_HASKELL)
4186 void 
4187 print_bq (StgClosure *node)
4188 {
4189   StgBlockingQueueElement *bqe;
4190   StgTSO *tso;
4191   rtsBool end;
4192
4193   debugBelch("## BQ of closure %p (%s): ",
4194           node, info_type(node));
4195
4196   /* should cover all closures that may have a blocking queue */
4197   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4198          get_itbl(node)->type == FETCH_ME_BQ ||
4199          get_itbl(node)->type == RBH ||
4200          get_itbl(node)->type == MVAR);
4201     
4202   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4203
4204   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4205 }
4206
4207 /* 
4208    Print a whole blocking queue starting with the element bqe.
4209 */
4210 void 
4211 print_bqe (StgBlockingQueueElement *bqe)
4212 {
4213   rtsBool end;
4214
4215   /* 
4216      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4217   */
4218   for (end = (bqe==END_BQ_QUEUE);
4219        !end; // iterate until bqe points to a CONSTR
4220        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4221        bqe = end ? END_BQ_QUEUE : bqe->link) {
4222     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4223     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4224     /* types of closures that may appear in a blocking queue */
4225     ASSERT(get_itbl(bqe)->type == TSO ||           
4226            get_itbl(bqe)->type == BLOCKED_FETCH || 
4227            get_itbl(bqe)->type == CONSTR); 
4228     /* only BQs of an RBH end with an RBH_Save closure */
4229     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4230
4231     switch (get_itbl(bqe)->type) {
4232     case TSO:
4233       debugBelch(" TSO %u (%x),",
4234               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4235       break;
4236     case BLOCKED_FETCH:
4237       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4238               ((StgBlockedFetch *)bqe)->node, 
4239               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4240               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4241               ((StgBlockedFetch *)bqe)->ga.weight);
4242       break;
4243     case CONSTR:
4244       debugBelch(" %s (IP %p),",
4245               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4246                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4247                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4248                "RBH_Save_?"), get_itbl(bqe));
4249       break;
4250     default:
4251       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4252            info_type((StgClosure *)bqe)); // , node, info_type(node));
4253       break;
4254     }
4255   } /* for */
4256   debugBelch("\n");
4257 }
4258 # elif defined(GRAN)
4259 void 
4260 print_bq (StgClosure *node)
4261 {
4262   StgBlockingQueueElement *bqe;
4263   PEs node_loc, tso_loc;
4264   rtsBool end;
4265
4266   /* should cover all closures that may have a blocking queue */
4267   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4268          get_itbl(node)->type == FETCH_ME_BQ ||
4269          get_itbl(node)->type == RBH);
4270     
4271   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4272   node_loc = where_is(node);
4273
4274   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4275           node, info_type(node), node_loc);
4276
4277   /* 
4278      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4279   */
4280   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4281        !end; // iterate until bqe points to a CONSTR
4282        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4283     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4284     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4285     /* types of closures that may appear in a blocking queue */
4286     ASSERT(get_itbl(bqe)->type == TSO ||           
4287            get_itbl(bqe)->type == CONSTR); 
4288     /* only BQs of an RBH end with an RBH_Save closure */
4289     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4290
4291     tso_loc = where_is((StgClosure *)bqe);
4292     switch (get_itbl(bqe)->type) {
4293     case TSO:
4294       debugBelch(" TSO %d (%p) on [PE %d],",
4295               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4296       break;
4297     case CONSTR:
4298       debugBelch(" %s (IP %p),",
4299               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4300                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4301                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4302                "RBH_Save_?"), get_itbl(bqe));
4303       break;
4304     default:
4305       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4306            info_type((StgClosure *)bqe), node, info_type(node));
4307       break;
4308     }
4309   } /* for */
4310   debugBelch("\n");
4311 }
4312 # endif
4313
4314 #if defined(PARALLEL_HASKELL)
4315 static nat
4316 run_queue_len(void)
4317 {
4318   nat i;
4319   StgTSO *tso;
4320
4321   for (i=0, tso=run_queue_hd; 
4322        tso != END_TSO_QUEUE;
4323        i++, tso=tso->link)
4324     /* nothing */
4325
4326   return i;
4327 }
4328 #endif
4329
4330 void
4331 sched_belch(char *s, ...)
4332 {
4333   va_list ap;
4334   va_start(ap,s);
4335 #ifdef RTS_SUPPORTS_THREADS
4336   debugBelch("sched (task %p): ", osThreadId());
4337 #elif defined(PARALLEL_HASKELL)
4338   debugBelch("== ");
4339 #else
4340   debugBelch("sched: ");
4341 #endif
4342   vdebugBelch(s, ap);
4343   debugBelch("\n");
4344   va_end(ap);
4345 }
4346
4347 #endif /* DEBUG */