[project @ 2005-05-11 12:52:05 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PARALLEL_HASKELL          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "OSThreads.h"
46 #include "Storage.h"
47 #include "StgRun.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PARALLEL_HASKELL)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 // Turn off inlining when debugging - it obfuscates things
97 #ifdef DEBUG
98 # undef  STATIC_INLINE
99 # define STATIC_INLINE static
100 #endif
101
102 #ifdef THREADED_RTS
103 #define USED_IN_THREADED_RTS
104 #else
105 #define USED_IN_THREADED_RTS STG_UNUSED
106 #endif
107
108 #ifdef RTS_SUPPORTS_THREADS
109 #define USED_WHEN_RTS_SUPPORTS_THREADS
110 #else
111 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
112 #endif
113
114 /* Main thread queue.
115  * Locks required: sched_mutex.
116  */
117 StgMainThread *main_threads = NULL;
118
119 #if defined(GRAN)
120
121 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
122 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
123
124 /* 
125    In GranSim we have a runnable and a blocked queue for each processor.
126    In order to minimise code changes new arrays run_queue_hds/tls
127    are created. run_queue_hd is then a short cut (macro) for
128    run_queue_hds[CurrentProc] (see GranSim.h).
129    -- HWL
130 */
131 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
132 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
133 StgTSO *ccalling_threadss[MAX_PROC];
134 /* We use the same global list of threads (all_threads) in GranSim as in
135    the std RTS (i.e. we are cheating). However, we don't use this list in
136    the GranSim specific code at the moment (so we are only potentially
137    cheating).  */
138
139 #else /* !GRAN */
140
141 /* Thread queues.
142  * Locks required: sched_mutex.
143  */
144 StgTSO *run_queue_hd = NULL;
145 StgTSO *run_queue_tl = NULL;
146 StgTSO *blocked_queue_hd = NULL;
147 StgTSO *blocked_queue_tl = NULL;
148 StgTSO *blackhole_queue = NULL;
149 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
150
151 #endif
152
153 /* The blackhole_queue should be checked for threads to wake up.  See
154  * Schedule.h for more thorough comment.
155  */
156 rtsBool blackholes_need_checking = rtsFalse;
157
158 /* Linked list of all threads.
159  * Used for detecting garbage collected threads.
160  */
161 StgTSO *all_threads = NULL;
162
163 /* When a thread performs a safe C call (_ccall_GC, using old
164  * terminology), it gets put on the suspended_ccalling_threads
165  * list. Used by the garbage collector.
166  */
167 static StgTSO *suspended_ccalling_threads;
168
169 /* KH: The following two flags are shared memory locations.  There is no need
170        to lock them, since they are only unset at the end of a scheduler
171        operation.
172 */
173
174 /* flag set by signal handler to precipitate a context switch */
175 int context_switch = 0;
176
177 /* flag that tracks whether we have done any execution in this time slice. */
178 nat recent_activity = ACTIVITY_YES;
179
180 /* if this flag is set as well, give up execution */
181 rtsBool interrupted = rtsFalse;
182
183 /* Next thread ID to allocate.
184  * Locks required: thread_id_mutex
185  */
186 static StgThreadID next_thread_id = 1;
187
188 /*
189  * Pointers to the state of the current thread.
190  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
191  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
192  */
193  
194 /* The smallest stack size that makes any sense is:
195  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
196  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
197  *  + 1                       (the closure to enter)
198  *  + 1                       (stg_ap_v_ret)
199  *  + 1                       (spare slot req'd by stg_ap_v_ret)
200  *
201  * A thread with this stack will bomb immediately with a stack
202  * overflow, which will increase its stack size.  
203  */
204
205 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
206
207
208 #if defined(GRAN)
209 StgTSO *CurrentTSO;
210 #endif
211
212 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
213  *  exists - earlier gccs apparently didn't.
214  *  -= chak
215  */
216 StgTSO dummy_tso;
217
218 /*
219  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
220  * in an MT setting, needed to signal that a worker thread shouldn't hang around
221  * in the scheduler when it is out of work.
222  */
223 static rtsBool shutting_down_scheduler = rtsFalse;
224
225 #if defined(RTS_SUPPORTS_THREADS)
226 /* ToDo: carefully document the invariants that go together
227  *       with these synchronisation objects.
228  */
229 Mutex     sched_mutex       = INIT_MUTEX_VAR;
230 Mutex     term_mutex        = INIT_MUTEX_VAR;
231
232 #endif /* RTS_SUPPORTS_THREADS */
233
234 #if defined(PARALLEL_HASKELL)
235 StgTSO *LastTSO;
236 rtsTime TimeOfLastYield;
237 rtsBool emitSchedule = rtsTrue;
238 #endif
239
240 #if DEBUG
241 static char *whatNext_strs[] = {
242   "(unknown)",
243   "ThreadRunGHC",
244   "ThreadInterpret",
245   "ThreadKilled",
246   "ThreadRelocated",
247   "ThreadComplete"
248 };
249 #endif
250
251 /* -----------------------------------------------------------------------------
252  * static function prototypes
253  * -------------------------------------------------------------------------- */
254
255 #if defined(RTS_SUPPORTS_THREADS)
256 static void taskStart(void);
257 #endif
258
259 static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
260                       Capability *initialCapability );
261
262 //
263 // These function all encapsulate parts of the scheduler loop, and are
264 // abstracted only to make the structure and control flow of the
265 // scheduler clearer.
266 //
267 static void schedulePreLoop(void);
268 static void scheduleStartSignalHandlers(void);
269 static void scheduleCheckBlockedThreads(void);
270 static void scheduleCheckBlackHoles(void);
271 static void scheduleDetectDeadlock(void);
272 #if defined(GRAN)
273 static StgTSO *scheduleProcessEvent(rtsEvent *event);
274 #endif
275 #if defined(PARALLEL_HASKELL)
276 static StgTSO *scheduleSendPendingMessages(void);
277 static void scheduleActivateSpark(void);
278 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
279 #endif
280 #if defined(PAR) || defined(GRAN)
281 static void scheduleGranParReport(void);
282 #endif
283 static void schedulePostRunThread(void);
284 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
285 static void scheduleHandleStackOverflow( StgTSO *t);
286 static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
287 static void scheduleHandleThreadBlocked( StgTSO *t );
288 static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, 
289                                              Capability *cap, StgTSO *t );
290 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
291 static void scheduleDoGC(Capability *cap);
292
293 static void unblockThread(StgTSO *tso);
294 static rtsBool checkBlackHoles(void);
295 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
296                                    Capability *initialCapability
297                                    );
298 static void scheduleThread_ (StgTSO* tso);
299 static void AllRoots(evac_fn evac);
300
301 static StgTSO *threadStackOverflow(StgTSO *tso);
302
303 static void raiseAsync_(StgTSO *tso, StgClosure *exception, 
304                         rtsBool stop_at_atomically);
305
306 static void printThreadBlockage(StgTSO *tso);
307 static void printThreadStatus(StgTSO *tso);
308
309 #if defined(PARALLEL_HASKELL)
310 StgTSO * createSparkThread(rtsSpark spark);
311 StgTSO * activateSpark (rtsSpark spark);  
312 #endif
313
314 /* ----------------------------------------------------------------------------
315  * Starting Tasks
316  * ------------------------------------------------------------------------- */
317
318 #if defined(RTS_SUPPORTS_THREADS)
319 static nat startingWorkerThread = 0;
320
321 static void
322 taskStart(void)
323 {
324   ACQUIRE_LOCK(&sched_mutex);
325   startingWorkerThread--;
326   schedule(NULL,NULL);
327   taskStop();
328   RELEASE_LOCK(&sched_mutex);
329 }
330
331 void
332 startSchedulerTaskIfNecessary(void)
333 {
334     if ( !EMPTY_RUN_QUEUE()
335          && !shutting_down_scheduler // not if we're shutting down
336          && startingWorkerThread==0)
337     {
338         // we don't want to start another worker thread
339         // just because the last one hasn't yet reached the
340         // "waiting for capability" state
341         startingWorkerThread++;
342         if (!maybeStartNewWorker(taskStart)) {
343             startingWorkerThread--;
344         }
345     }
346 }
347 #endif
348
349 /* -----------------------------------------------------------------------------
350  * Putting a thread on the run queue: different scheduling policies
351  * -------------------------------------------------------------------------- */
352
353 STATIC_INLINE void
354 addToRunQueue( StgTSO *t )
355 {
356 #if defined(PARALLEL_HASKELL)
357     if (RtsFlags.ParFlags.doFairScheduling) { 
358         // this does round-robin scheduling; good for concurrency
359         APPEND_TO_RUN_QUEUE(t);
360     } else {
361         // this does unfair scheduling; good for parallelism
362         PUSH_ON_RUN_QUEUE(t);
363     }
364 #else
365     // this does round-robin scheduling; good for concurrency
366     APPEND_TO_RUN_QUEUE(t);
367 #endif
368 }
369     
370 /* ---------------------------------------------------------------------------
371    Main scheduling loop.
372
373    We use round-robin scheduling, each thread returning to the
374    scheduler loop when one of these conditions is detected:
375
376       * out of heap space
377       * timer expires (thread yields)
378       * thread blocks
379       * thread ends
380       * stack overflow
381
382    Locking notes:  we acquire the scheduler lock once at the beginning
383    of the scheduler loop, and release it when
384     
385       * running a thread, or
386       * waiting for work, or
387       * waiting for a GC to complete.
388
389    GRAN version:
390      In a GranSim setup this loop iterates over the global event queue.
391      This revolves around the global event queue, which determines what 
392      to do next. Therefore, it's more complicated than either the 
393      concurrent or the parallel (GUM) setup.
394
395    GUM version:
396      GUM iterates over incoming messages.
397      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
398      and sends out a fish whenever it has nothing to do; in-between
399      doing the actual reductions (shared code below) it processes the
400      incoming messages and deals with delayed operations 
401      (see PendingFetches).
402      This is not the ugliest code you could imagine, but it's bloody close.
403
404    ------------------------------------------------------------------------ */
405
406 static void
407 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
408           Capability *initialCapability )
409 {
410   StgTSO *t;
411   Capability *cap;
412   StgThreadReturnCode ret;
413 #if defined(GRAN)
414   rtsEvent *event;
415 #elif defined(PARALLEL_HASKELL)
416   StgTSO *tso;
417   GlobalTaskId pe;
418   rtsBool receivedFinish = rtsFalse;
419 # if defined(DEBUG)
420   nat tp_size, sp_size; // stats only
421 # endif
422 #endif
423   nat prev_what_next;
424   rtsBool ready_to_gc;
425   
426   // Pre-condition: sched_mutex is held.
427   // We might have a capability, passed in as initialCapability.
428   cap = initialCapability;
429
430 #if !defined(RTS_SUPPORTS_THREADS)
431   // simply initialise it in the non-threaded case
432   grabCapability(&cap);
433 #endif
434
435   IF_DEBUG(scheduler,
436            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
437                        mainThread, initialCapability);
438       );
439
440   schedulePreLoop();
441
442   // -----------------------------------------------------------
443   // Scheduler loop starts here:
444
445 #if defined(PARALLEL_HASKELL)
446 #define TERMINATION_CONDITION        (!receivedFinish)
447 #elif defined(GRAN)
448 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
449 #else
450 #define TERMINATION_CONDITION        rtsTrue
451 #endif
452
453   while (TERMINATION_CONDITION) {
454
455 #if defined(GRAN)
456       /* Choose the processor with the next event */
457       CurrentProc = event->proc;
458       CurrentTSO = event->tso;
459 #endif
460
461       IF_DEBUG(scheduler, printAllThreads());
462
463 #if defined(RTS_SUPPORTS_THREADS)
464       // Yield the capability to higher-priority tasks if necessary.
465       //
466       if (cap != NULL) {
467           yieldCapability(&cap);
468       }
469
470       // If we do not currently hold a capability, we wait for one
471       //
472       if (cap == NULL) {
473           waitForCapability(&sched_mutex, &cap,
474                             mainThread ? &mainThread->bound_thread_cond : NULL);
475       }
476
477       // We now have a capability...
478 #endif
479       
480 #if 0 /* extra sanity checking */
481       { 
482           StgMainThread *m;
483           for (m = main_threads; m != NULL; m = m->link) {
484               ASSERT(get_itbl(m->tso)->type == TSO);
485           }
486       }
487 #endif
488
489     // Check whether we have re-entered the RTS from Haskell without
490     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
491     // call).
492     if (cap->r.rInHaskell) {
493           errorBelch("schedule: re-entered unsafely.\n"
494                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
495           stg_exit(1);
496     }
497
498     //
499     // Test for interruption.  If interrupted==rtsTrue, then either
500     // we received a keyboard interrupt (^C), or the scheduler is
501     // trying to shut down all the tasks (shutting_down_scheduler) in
502     // the threaded RTS.
503     //
504     if (interrupted) {
505         if (shutting_down_scheduler) {
506             IF_DEBUG(scheduler, sched_belch("shutting down"));
507             releaseCapability(cap);
508             if (mainThread) {
509                 mainThread->stat = Interrupted;
510                 mainThread->ret  = NULL;
511             }
512             return;
513         } else {
514             IF_DEBUG(scheduler, sched_belch("interrupted"));
515             deleteAllThreads();
516         }
517     }
518
519 #if defined(not_yet) && defined(SMP)
520     //
521     // Top up the run queue from our spark pool.  We try to make the
522     // number of threads in the run queue equal to the number of
523     // free capabilities.
524     //
525     {
526         StgClosure *spark;
527         if (EMPTY_RUN_QUEUE()) {
528             spark = findSpark(rtsFalse);
529             if (spark == NULL) {
530                 break; /* no more sparks in the pool */
531             } else {
532                 createSparkThread(spark);         
533                 IF_DEBUG(scheduler,
534                          sched_belch("==^^ turning spark of closure %p into a thread",
535                                      (StgClosure *)spark));
536             }
537         }
538     }
539 #endif // SMP
540
541     scheduleStartSignalHandlers();
542
543     // Only check the black holes here if we've nothing else to do.
544     // During normal execution, the black hole list only gets checked
545     // at GC time, to avoid repeatedly traversing this possibly long
546     // list each time around the scheduler.
547     if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); }
548
549     scheduleCheckBlockedThreads();
550
551     scheduleDetectDeadlock();
552
553     // Normally, the only way we can get here with no threads to
554     // run is if a keyboard interrupt received during 
555     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
556     // Additionally, it is not fatal for the
557     // threaded RTS to reach here with no threads to run.
558     //
559     // win32: might be here due to awaitEvent() being abandoned
560     // as a result of a console event having been delivered.
561     if ( EMPTY_RUN_QUEUE() ) {
562 #if !defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_HOST_OS)
563         ASSERT(interrupted);
564 #endif
565         continue; // nothing to do
566     }
567
568 #if defined(PARALLEL_HASKELL)
569     scheduleSendPendingMessages();
570     if (EMPTY_RUN_QUEUE() && scheduleActivateSpark()) 
571         continue;
572
573 #if defined(SPARKS)
574     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
575 #endif
576
577     /* If we still have no work we need to send a FISH to get a spark
578        from another PE */
579     if (EMPTY_RUN_QUEUE()) {
580         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
581         ASSERT(rtsFalse); // should not happen at the moment
582     }
583     // from here: non-empty run queue.
584     //  TODO: merge above case with this, only one call processMessages() !
585     if (PacketsWaiting()) {  /* process incoming messages, if
586                                 any pending...  only in else
587                                 because getRemoteWork waits for
588                                 messages as well */
589         receivedFinish = processMessages();
590     }
591 #endif
592
593 #if defined(GRAN)
594     scheduleProcessEvent(event);
595 #endif
596
597     // 
598     // Get a thread to run
599     //
600     ASSERT(run_queue_hd != END_TSO_QUEUE);
601     POP_RUN_QUEUE(t);
602
603 #if defined(GRAN) || defined(PAR)
604     scheduleGranParReport(); // some kind of debuging output
605 #else
606     // Sanity check the thread we're about to run.  This can be
607     // expensive if there is lots of thread switching going on...
608     IF_DEBUG(sanity,checkTSO(t));
609 #endif
610
611 #if defined(RTS_SUPPORTS_THREADS)
612     // Check whether we can run this thread in the current task.
613     // If not, we have to pass our capability to the right task.
614     {
615       StgMainThread *m = t->main;
616       
617       if(m)
618       {
619         if(m == mainThread)
620         {
621           IF_DEBUG(scheduler,
622             sched_belch("### Running thread %d in bound thread", t->id));
623           // yes, the Haskell thread is bound to the current native thread
624         }
625         else
626         {
627           IF_DEBUG(scheduler,
628             sched_belch("### thread %d bound to another OS thread", t->id));
629           // no, bound to a different Haskell thread: pass to that thread
630           PUSH_ON_RUN_QUEUE(t);
631           passCapability(&m->bound_thread_cond);
632           continue;
633         }
634       }
635       else
636       {
637         if(mainThread != NULL)
638         // The thread we want to run is bound.
639         {
640           IF_DEBUG(scheduler,
641             sched_belch("### this OS thread cannot run thread %d", t->id));
642           // no, the current native thread is bound to a different
643           // Haskell thread, so pass it to any worker thread
644           PUSH_ON_RUN_QUEUE(t);
645           passCapabilityToWorker();
646           continue; 
647         }
648       }
649     }
650 #endif
651
652     cap->r.rCurrentTSO = t;
653     
654     /* context switches are now initiated by the timer signal, unless
655      * the user specified "context switch as often as possible", with
656      * +RTS -C0
657      */
658     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
659          && (run_queue_hd != END_TSO_QUEUE
660              || blocked_queue_hd != END_TSO_QUEUE
661              || sleeping_queue != END_TSO_QUEUE)))
662         context_switch = 1;
663
664 run_thread:
665
666     RELEASE_LOCK(&sched_mutex);
667
668     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
669                               (long)t->id, whatNext_strs[t->what_next]));
670
671 #if defined(PROFILING)
672     startHeapProfTimer();
673 #endif
674
675     // ----------------------------------------------------------------------
676     // Run the current thread 
677
678     prev_what_next = t->what_next;
679
680     errno = t->saved_errno;
681     cap->r.rInHaskell = rtsTrue;
682
683     recent_activity = ACTIVITY_YES;
684
685     switch (prev_what_next) {
686
687     case ThreadKilled:
688     case ThreadComplete:
689         /* Thread already finished, return to scheduler. */
690         ret = ThreadFinished;
691         break;
692
693     case ThreadRunGHC:
694         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
695         break;
696
697     case ThreadInterpret:
698         ret = interpretBCO(cap);
699         break;
700
701     default:
702       barf("schedule: invalid what_next field");
703     }
704
705     // We have run some Haskell code: there might be blackhole-blocked
706     // threads to wake up now.
707     if ( blackhole_queue != END_TSO_QUEUE ) {
708         blackholes_need_checking = rtsTrue;
709     }
710
711     cap->r.rInHaskell = rtsFalse;
712
713     // The TSO might have moved, eg. if it re-entered the RTS and a GC
714     // happened.  So find the new location:
715     t = cap->r.rCurrentTSO;
716
717     // And save the current errno in this thread.
718     t->saved_errno = errno;
719
720     // ----------------------------------------------------------------------
721     
722     /* Costs for the scheduler are assigned to CCS_SYSTEM */
723 #if defined(PROFILING)
724     stopHeapProfTimer();
725     CCCS = CCS_SYSTEM;
726 #endif
727     
728     ACQUIRE_LOCK(&sched_mutex);
729     
730 #if defined(RTS_SUPPORTS_THREADS)
731     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
732 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
733     IF_DEBUG(scheduler,debugBelch("sched: "););
734 #endif
735     
736     schedulePostRunThread();
737
738     ready_to_gc = rtsFalse;
739
740     switch (ret) {
741     case HeapOverflow:
742         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
743         break;
744
745     case StackOverflow:
746         scheduleHandleStackOverflow(t);
747         break;
748
749     case ThreadYielding:
750         if (scheduleHandleYield(t, prev_what_next)) {
751             // shortcut for switching between compiler/interpreter:
752             goto run_thread; 
753         }
754         break;
755
756     case ThreadBlocked:
757         scheduleHandleThreadBlocked(t);
758         threadPaused(t);
759         break;
760
761     case ThreadFinished:
762         if (scheduleHandleThreadFinished(mainThread, cap, t)) return;;
763         break;
764
765     default:
766       barf("schedule: invalid thread return code %d", (int)ret);
767     }
768
769     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
770     if (ready_to_gc) { scheduleDoGC(cap); }
771   } /* end of while() */
772
773   IF_PAR_DEBUG(verbose,
774                debugBelch("== Leaving schedule() after having received Finish\n"));
775 }
776
777 /* ----------------------------------------------------------------------------
778  * Setting up the scheduler loop
779  * ASSUMES: sched_mutex
780  * ------------------------------------------------------------------------- */
781
782 static void
783 schedulePreLoop(void)
784 {
785 #if defined(GRAN) 
786     /* set up first event to get things going */
787     /* ToDo: assign costs for system setup and init MainTSO ! */
788     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
789               ContinueThread, 
790               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
791     
792     IF_DEBUG(gran,
793              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
794                         CurrentTSO);
795              G_TSO(CurrentTSO, 5));
796     
797     if (RtsFlags.GranFlags.Light) {
798         /* Save current time; GranSim Light only */
799         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
800     }      
801 #endif
802 }
803
804 /* ----------------------------------------------------------------------------
805  * Start any pending signal handlers
806  * ASSUMES: sched_mutex
807  * ------------------------------------------------------------------------- */
808
809 static void
810 scheduleStartSignalHandlers(void)
811 {
812 #if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
813     if (signals_pending()) {
814       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
815       startSignalHandlers();
816       ACQUIRE_LOCK(&sched_mutex);
817     }
818 #endif
819 }
820
821 /* ----------------------------------------------------------------------------
822  * Check for blocked threads that can be woken up.
823  * ASSUMES: sched_mutex
824  * ------------------------------------------------------------------------- */
825
826 static void
827 scheduleCheckBlockedThreads(void)
828 {
829     //
830     // Check whether any waiting threads need to be woken up.  If the
831     // run queue is empty, and there are no other tasks running, we
832     // can wait indefinitely for something to happen.
833     //
834     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
835     {
836 #if defined(RTS_SUPPORTS_THREADS)
837         // We shouldn't be here...
838         barf("schedule: awaitEvent() in threaded RTS");
839 #endif
840         awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
841     }
842 }
843
844
845 /* ----------------------------------------------------------------------------
846  * Check for threads blocked on BLACKHOLEs that can be woken up
847  * ASSUMES: sched_mutex
848  * ------------------------------------------------------------------------- */
849 static void
850 scheduleCheckBlackHoles( void )
851 {
852     if ( blackholes_need_checking )
853     {
854         checkBlackHoles();
855         blackholes_need_checking = rtsFalse;
856     }
857 }
858
859 /* ----------------------------------------------------------------------------
860  * Detect deadlock conditions and attempt to resolve them.
861  * ASSUMES: sched_mutex
862  * ------------------------------------------------------------------------- */
863
864 static void
865 scheduleDetectDeadlock(void)
866 {
867
868 #if defined(PARALLEL_HASKELL)
869     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
870     return;
871 #endif
872
873     /* 
874      * Detect deadlock: when we have no threads to run, there are no
875      * threads blocked, waiting for I/O, or sleeping, and all the
876      * other tasks are waiting for work, we must have a deadlock of
877      * some description.
878      */
879     if ( EMPTY_THREAD_QUEUES() )
880     {
881 #if defined(RTS_SUPPORTS_THREADS)
882         /* 
883          * In the threaded RTS, we only check for deadlock if there
884          * has been no activity in a complete timeslice.  This means
885          * we won't eagerly start a full GC just because we don't have
886          * any threads to run currently.
887          */
888         if (recent_activity != ACTIVITY_INACTIVE) return;
889 #endif
890
891         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
892
893         // Garbage collection can release some new threads due to
894         // either (a) finalizers or (b) threads resurrected because
895         // they are unreachable and will therefore be sent an
896         // exception.  Any threads thus released will be immediately
897         // runnable.
898         GarbageCollect(GetRoots,rtsTrue);
899         recent_activity = ACTIVITY_DONE_GC;
900         if ( !EMPTY_RUN_QUEUE() ) return;
901
902 #if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
903         /* If we have user-installed signal handlers, then wait
904          * for signals to arrive rather then bombing out with a
905          * deadlock.
906          */
907         if ( anyUserHandlers() ) {
908             IF_DEBUG(scheduler, 
909                      sched_belch("still deadlocked, waiting for signals..."));
910
911             awaitUserSignals();
912
913             if (signals_pending()) {
914                 RELEASE_LOCK(&sched_mutex);
915                 startSignalHandlers();
916                 ACQUIRE_LOCK(&sched_mutex);
917             }
918
919             // either we have threads to run, or we were interrupted:
920             ASSERT(!EMPTY_RUN_QUEUE() || interrupted);
921         }
922 #endif
923
924 #if !defined(RTS_SUPPORTS_THREADS)
925         /* Probably a real deadlock.  Send the current main thread the
926          * Deadlock exception (or in the SMP build, send *all* main
927          * threads the deadlock exception, since none of them can make
928          * progress).
929          */
930         {
931             StgMainThread *m;
932             m = main_threads;
933             switch (m->tso->why_blocked) {
934             case BlockedOnSTM:
935             case BlockedOnBlackHole:
936             case BlockedOnException:
937             case BlockedOnMVar:
938                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
939                 return;
940             default:
941                 barf("deadlock: main thread blocked in a strange way");
942             }
943         }
944 #endif
945     }
946 }
947
948 /* ----------------------------------------------------------------------------
949  * Process an event (GRAN only)
950  * ------------------------------------------------------------------------- */
951
952 #if defined(GRAN)
953 static StgTSO *
954 scheduleProcessEvent(rtsEvent *event)
955 {
956     StgTSO *t;
957
958     if (RtsFlags.GranFlags.Light)
959       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
960
961     /* adjust time based on time-stamp */
962     if (event->time > CurrentTime[CurrentProc] &&
963         event->evttype != ContinueThread)
964       CurrentTime[CurrentProc] = event->time;
965     
966     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
967     if (!RtsFlags.GranFlags.Light)
968       handleIdlePEs();
969
970     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
971
972     /* main event dispatcher in GranSim */
973     switch (event->evttype) {
974       /* Should just be continuing execution */
975     case ContinueThread:
976       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
977       /* ToDo: check assertion
978       ASSERT(run_queue_hd != (StgTSO*)NULL &&
979              run_queue_hd != END_TSO_QUEUE);
980       */
981       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
982       if (!RtsFlags.GranFlags.DoAsyncFetch &&
983           procStatus[CurrentProc]==Fetching) {
984         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
985               CurrentTSO->id, CurrentTSO, CurrentProc);
986         goto next_thread;
987       } 
988       /* Ignore ContinueThreads for completed threads */
989       if (CurrentTSO->what_next == ThreadComplete) {
990         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
991               CurrentTSO->id, CurrentTSO, CurrentProc);
992         goto next_thread;
993       } 
994       /* Ignore ContinueThreads for threads that are being migrated */
995       if (PROCS(CurrentTSO)==Nowhere) { 
996         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
997               CurrentTSO->id, CurrentTSO, CurrentProc);
998         goto next_thread;
999       }
1000       /* The thread should be at the beginning of the run queue */
1001       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1002         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1003               CurrentTSO->id, CurrentTSO, CurrentProc);
1004         break; // run the thread anyway
1005       }
1006       /*
1007       new_event(proc, proc, CurrentTime[proc],
1008                 FindWork,
1009                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1010       goto next_thread; 
1011       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1012       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1013
1014     case FetchNode:
1015       do_the_fetchnode(event);
1016       goto next_thread;             /* handle next event in event queue  */
1017       
1018     case GlobalBlock:
1019       do_the_globalblock(event);
1020       goto next_thread;             /* handle next event in event queue  */
1021       
1022     case FetchReply:
1023       do_the_fetchreply(event);
1024       goto next_thread;             /* handle next event in event queue  */
1025       
1026     case UnblockThread:   /* Move from the blocked queue to the tail of */
1027       do_the_unblock(event);
1028       goto next_thread;             /* handle next event in event queue  */
1029       
1030     case ResumeThread:  /* Move from the blocked queue to the tail of */
1031       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1032       event->tso->gran.blocktime += 
1033         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1034       do_the_startthread(event);
1035       goto next_thread;             /* handle next event in event queue  */
1036       
1037     case StartThread:
1038       do_the_startthread(event);
1039       goto next_thread;             /* handle next event in event queue  */
1040       
1041     case MoveThread:
1042       do_the_movethread(event);
1043       goto next_thread;             /* handle next event in event queue  */
1044       
1045     case MoveSpark:
1046       do_the_movespark(event);
1047       goto next_thread;             /* handle next event in event queue  */
1048       
1049     case FindWork:
1050       do_the_findwork(event);
1051       goto next_thread;             /* handle next event in event queue  */
1052       
1053     default:
1054       barf("Illegal event type %u\n", event->evttype);
1055     }  /* switch */
1056     
1057     /* This point was scheduler_loop in the old RTS */
1058
1059     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1060
1061     TimeOfLastEvent = CurrentTime[CurrentProc];
1062     TimeOfNextEvent = get_time_of_next_event();
1063     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1064     // CurrentTSO = ThreadQueueHd;
1065
1066     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1067                          TimeOfNextEvent));
1068
1069     if (RtsFlags.GranFlags.Light) 
1070       GranSimLight_leave_system(event, &ActiveTSO); 
1071
1072     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1073
1074     IF_DEBUG(gran, 
1075              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1076
1077     /* in a GranSim setup the TSO stays on the run queue */
1078     t = CurrentTSO;
1079     /* Take a thread from the run queue. */
1080     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1081
1082     IF_DEBUG(gran, 
1083              debugBelch("GRAN: About to run current thread, which is\n");
1084              G_TSO(t,5));
1085
1086     context_switch = 0; // turned on via GranYield, checking events and time slice
1087
1088     IF_DEBUG(gran, 
1089              DumpGranEvent(GR_SCHEDULE, t));
1090
1091     procStatus[CurrentProc] = Busy;
1092 }
1093 #endif // GRAN
1094
1095 /* ----------------------------------------------------------------------------
1096  * Send pending messages (PARALLEL_HASKELL only)
1097  * ------------------------------------------------------------------------- */
1098
1099 #if defined(PARALLEL_HASKELL)
1100 static StgTSO *
1101 scheduleSendPendingMessages(void)
1102 {
1103     StgSparkPool *pool;
1104     rtsSpark spark;
1105     StgTSO *t;
1106
1107 # if defined(PAR) // global Mem.Mgmt., omit for now
1108     if (PendingFetches != END_BF_QUEUE) {
1109         processFetches();
1110     }
1111 # endif
1112     
1113     if (RtsFlags.ParFlags.BufferTime) {
1114         // if we use message buffering, we must send away all message
1115         // packets which have become too old...
1116         sendOldBuffers(); 
1117     }
1118 }
1119 #endif
1120
1121 /* ----------------------------------------------------------------------------
1122  * Activate spark threads (PARALLEL_HASKELL only)
1123  * ------------------------------------------------------------------------- */
1124
1125 #if defined(PARALLEL_HASKELL)
1126 static void
1127 scheduleActivateSpark(void)
1128 {
1129 #if defined(SPARKS)
1130   ASSERT(EMPTY_RUN_QUEUE());
1131 /* We get here if the run queue is empty and want some work.
1132    We try to turn a spark into a thread, and add it to the run queue,
1133    from where it will be picked up in the next iteration of the scheduler
1134    loop.
1135 */
1136
1137       /* :-[  no local threads => look out for local sparks */
1138       /* the spark pool for the current PE */
1139       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1140       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1141           pool->hd < pool->tl) {
1142         /* 
1143          * ToDo: add GC code check that we really have enough heap afterwards!!
1144          * Old comment:
1145          * If we're here (no runnable threads) and we have pending
1146          * sparks, we must have a space problem.  Get enough space
1147          * to turn one of those pending sparks into a
1148          * thread... 
1149          */
1150
1151         spark = findSpark(rtsFalse);            /* get a spark */
1152         if (spark != (rtsSpark) NULL) {
1153           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1154           IF_PAR_DEBUG(fish, // schedule,
1155                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1156                              tso->id, tso, advisory_thread_count));
1157
1158           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1159             IF_PAR_DEBUG(fish, // schedule,
1160                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1161                             spark));
1162             return rtsFalse; /* failed to generate a thread */
1163           }                  /* otherwise fall through & pick-up new tso */
1164         } else {
1165           IF_PAR_DEBUG(fish, // schedule,
1166                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1167                              spark_queue_len(pool)));
1168           return rtsFalse;  /* failed to generate a thread */
1169         }
1170         return rtsTrue;  /* success in generating a thread */
1171   } else { /* no more threads permitted or pool empty */
1172     return rtsFalse;  /* failed to generateThread */
1173   }
1174 #else
1175   tso = NULL; // avoid compiler warning only
1176   return rtsFalse;  /* dummy in non-PAR setup */
1177 #endif // SPARKS
1178 }
1179 #endif // PARALLEL_HASKELL
1180
1181 /* ----------------------------------------------------------------------------
1182  * Get work from a remote node (PARALLEL_HASKELL only)
1183  * ------------------------------------------------------------------------- */
1184     
1185 #if defined(PARALLEL_HASKELL)
1186 static rtsBool
1187 scheduleGetRemoteWork(rtsBool *receivedFinish)
1188 {
1189   ASSERT(EMPTY_RUN_QUEUE());
1190
1191   if (RtsFlags.ParFlags.BufferTime) {
1192         IF_PAR_DEBUG(verbose, 
1193                 debugBelch("...send all pending data,"));
1194         {
1195           nat i;
1196           for (i=1; i<=nPEs; i++)
1197             sendImmediately(i); // send all messages away immediately
1198         }
1199   }
1200 # ifndef SPARKS
1201         //++EDEN++ idle() , i.e. send all buffers, wait for work
1202         // suppress fishing in EDEN... just look for incoming messages
1203         // (blocking receive)
1204   IF_PAR_DEBUG(verbose, 
1205                debugBelch("...wait for incoming messages...\n"));
1206   *receivedFinish = processMessages(); // blocking receive...
1207
1208         // and reenter scheduling loop after having received something
1209         // (return rtsFalse below)
1210
1211 # else /* activate SPARKS machinery */
1212 /* We get here, if we have no work, tried to activate a local spark, but still
1213    have no work. We try to get a remote spark, by sending a FISH message.
1214    Thread migration should be added here, and triggered when a sequence of 
1215    fishes returns without work. */
1216         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1217
1218       /* =8-[  no local sparks => look for work on other PEs */
1219         /*
1220          * We really have absolutely no work.  Send out a fish
1221          * (there may be some out there already), and wait for
1222          * something to arrive.  We clearly can't run any threads
1223          * until a SCHEDULE or RESUME arrives, and so that's what
1224          * we're hoping to see.  (Of course, we still have to
1225          * respond to other types of messages.)
1226          */
1227         rtsTime now = msTime() /*CURRENT_TIME*/;
1228         IF_PAR_DEBUG(verbose, 
1229                      debugBelch("--  now=%ld\n", now));
1230         IF_PAR_DEBUG(fish, // verbose,
1231              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1232                  (last_fish_arrived_at!=0 &&
1233                   last_fish_arrived_at+delay > now)) {
1234                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1235                      now, last_fish_arrived_at+delay, 
1236                      last_fish_arrived_at,
1237                      delay);
1238              });
1239   
1240         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1241             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1242           if (last_fish_arrived_at==0 ||
1243               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1244             /* outstandingFishes is set in sendFish, processFish;
1245                avoid flooding system with fishes via delay */
1246     next_fish_to_send_at = 0;  
1247   } else {
1248     /* ToDo: this should be done in the main scheduling loop to avoid the
1249              busy wait here; not so bad if fish delay is very small  */
1250     int iq = 0; // DEBUGGING -- HWL
1251     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1252     /* send a fish when ready, but process messages that arrive in the meantime */
1253     do {
1254       if (PacketsWaiting()) {
1255         iq++; // DEBUGGING
1256         *receivedFinish = processMessages();
1257       }
1258       now = msTime();
1259     } while (!*receivedFinish || now<next_fish_to_send_at);
1260     // JB: This means the fish could become obsolete, if we receive
1261     // work. Better check for work again? 
1262     // last line: while (!receivedFinish || !haveWork || now<...)
1263     // next line: if (receivedFinish || haveWork )
1264
1265     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1266       return rtsFalse;  // NB: this will leave scheduler loop
1267                         // immediately after return!
1268                           
1269     IF_PAR_DEBUG(fish, // verbose,
1270                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1271
1272   }
1273
1274     // JB: IMHO, this should all be hidden inside sendFish(...)
1275     /* pe = choosePE(); 
1276        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1277                 NEW_FISH_HUNGER);
1278
1279     // Global statistics: count no. of fishes
1280     if (RtsFlags.ParFlags.ParStats.Global &&
1281          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1282            globalParStats.tot_fish_mess++;
1283            }
1284     */ 
1285
1286   /* delayed fishes must have been sent by now! */
1287   next_fish_to_send_at = 0;  
1288   }
1289       
1290   *receivedFinish = processMessages();
1291 # endif /* SPARKS */
1292
1293  return rtsFalse;
1294  /* NB: this function always returns rtsFalse, meaning the scheduler
1295     loop continues with the next iteration; 
1296     rationale: 
1297       return code means success in finding work; we enter this function
1298       if there is no local work, thus have to send a fish which takes
1299       time until it arrives with work; in the meantime we should process
1300       messages in the main loop;
1301  */
1302 }
1303 #endif // PARALLEL_HASKELL
1304
1305 /* ----------------------------------------------------------------------------
1306  * PAR/GRAN: Report stats & debugging info(?)
1307  * ------------------------------------------------------------------------- */
1308
1309 #if defined(PAR) || defined(GRAN)
1310 static void
1311 scheduleGranParReport(void)
1312 {
1313   ASSERT(run_queue_hd != END_TSO_QUEUE);
1314
1315   /* Take a thread from the run queue, if we have work */
1316   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1317
1318     /* If this TSO has got its outport closed in the meantime, 
1319      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1320      * It has to be marked as TH_DEAD for this purpose.
1321      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1322
1323 JB: TODO: investigate wether state change field could be nuked
1324      entirely and replaced by the normal tso state (whatnext
1325      field). All we want to do is to kill tsos from outside.
1326      */
1327
1328     /* ToDo: write something to the log-file
1329     if (RTSflags.ParFlags.granSimStats && !sameThread)
1330         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1331
1332     CurrentTSO = t;
1333     */
1334     /* the spark pool for the current PE */
1335     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1336
1337     IF_DEBUG(scheduler, 
1338              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1339                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1340
1341     IF_PAR_DEBUG(fish,
1342              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1343                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1344
1345     if (RtsFlags.ParFlags.ParStats.Full && 
1346         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1347         (emitSchedule || // forced emit
1348          (t && LastTSO && t->id != LastTSO->id))) {
1349       /* 
1350          we are running a different TSO, so write a schedule event to log file
1351          NB: If we use fair scheduling we also have to write  a deschedule 
1352              event for LastTSO; with unfair scheduling we know that the
1353              previous tso has blocked whenever we switch to another tso, so
1354              we don't need it in GUM for now
1355       */
1356       IF_PAR_DEBUG(fish, // schedule,
1357                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1358
1359       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1360                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1361       emitSchedule = rtsFalse;
1362     }
1363 }     
1364 #endif
1365
1366 /* ----------------------------------------------------------------------------
1367  * After running a thread...
1368  * ASSUMES: sched_mutex
1369  * ------------------------------------------------------------------------- */
1370
1371 static void
1372 schedulePostRunThread(void)
1373 {
1374 #if defined(PAR)
1375     /* HACK 675: if the last thread didn't yield, make sure to print a 
1376        SCHEDULE event to the log file when StgRunning the next thread, even
1377        if it is the same one as before */
1378     LastTSO = t; 
1379     TimeOfLastYield = CURRENT_TIME;
1380 #endif
1381
1382   /* some statistics gathering in the parallel case */
1383
1384 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1385   switch (ret) {
1386     case HeapOverflow:
1387 # if defined(GRAN)
1388       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1389       globalGranStats.tot_heapover++;
1390 # elif defined(PAR)
1391       globalParStats.tot_heapover++;
1392 # endif
1393       break;
1394
1395      case StackOverflow:
1396 # if defined(GRAN)
1397       IF_DEBUG(gran, 
1398                DumpGranEvent(GR_DESCHEDULE, t));
1399       globalGranStats.tot_stackover++;
1400 # elif defined(PAR)
1401       // IF_DEBUG(par, 
1402       // DumpGranEvent(GR_DESCHEDULE, t);
1403       globalParStats.tot_stackover++;
1404 # endif
1405       break;
1406
1407     case ThreadYielding:
1408 # if defined(GRAN)
1409       IF_DEBUG(gran, 
1410                DumpGranEvent(GR_DESCHEDULE, t));
1411       globalGranStats.tot_yields++;
1412 # elif defined(PAR)
1413       // IF_DEBUG(par, 
1414       // DumpGranEvent(GR_DESCHEDULE, t);
1415       globalParStats.tot_yields++;
1416 # endif
1417       break; 
1418
1419     case ThreadBlocked:
1420 # if defined(GRAN)
1421       IF_DEBUG(scheduler,
1422                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1423                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1424                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1425                if (t->block_info.closure!=(StgClosure*)NULL)
1426                  print_bq(t->block_info.closure);
1427                debugBelch("\n"));
1428
1429       // ??? needed; should emit block before
1430       IF_DEBUG(gran, 
1431                DumpGranEvent(GR_DESCHEDULE, t)); 
1432       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1433       /*
1434         ngoq Dogh!
1435       ASSERT(procStatus[CurrentProc]==Busy || 
1436               ((procStatus[CurrentProc]==Fetching) && 
1437               (t->block_info.closure!=(StgClosure*)NULL)));
1438       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1439           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1440             procStatus[CurrentProc]==Fetching)) 
1441         procStatus[CurrentProc] = Idle;
1442       */
1443 # elif defined(PAR)
1444 //++PAR++  blockThread() writes the event (change?)
1445 # endif
1446     break;
1447
1448   case ThreadFinished:
1449     break;
1450
1451   default:
1452     barf("parGlobalStats: unknown return code");
1453     break;
1454     }
1455 #endif
1456 }
1457
1458 /* -----------------------------------------------------------------------------
1459  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1460  * ASSUMES: sched_mutex
1461  * -------------------------------------------------------------------------- */
1462
1463 static rtsBool
1464 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1465 {
1466     // did the task ask for a large block?
1467     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1468         // if so, get one and push it on the front of the nursery.
1469         bdescr *bd;
1470         lnat blocks;
1471         
1472         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1473         
1474         IF_DEBUG(scheduler,
1475                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1476                             (long)t->id, whatNext_strs[t->what_next], blocks));
1477         
1478         // don't do this if the nursery is (nearly) full, we'll GC first.
1479         if (cap->r.rCurrentNursery->link != NULL ||
1480             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1481                                                // if the nursery has only one block.
1482             
1483             bd = allocGroup( blocks );
1484             cap->r.rNursery->n_blocks += blocks;
1485             
1486             // link the new group into the list
1487             bd->link = cap->r.rCurrentNursery;
1488             bd->u.back = cap->r.rCurrentNursery->u.back;
1489             if (cap->r.rCurrentNursery->u.back != NULL) {
1490                 cap->r.rCurrentNursery->u.back->link = bd;
1491             } else {
1492 #if !defined(SMP)
1493                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1494                        g0s0 == cap->r.rNursery);
1495 #endif
1496                 cap->r.rNursery->blocks = bd;
1497             }             
1498             cap->r.rCurrentNursery->u.back = bd;
1499             
1500             // initialise it as a nursery block.  We initialise the
1501             // step, gen_no, and flags field of *every* sub-block in
1502             // this large block, because this is easier than making
1503             // sure that we always find the block head of a large
1504             // block whenever we call Bdescr() (eg. evacuate() and
1505             // isAlive() in the GC would both have to do this, at
1506             // least).
1507             { 
1508                 bdescr *x;
1509                 for (x = bd; x < bd + blocks; x++) {
1510                     x->step = cap->r.rNursery;
1511                     x->gen_no = 0;
1512                     x->flags = 0;
1513                 }
1514             }
1515             
1516             // This assert can be a killer if the app is doing lots
1517             // of large block allocations.
1518             ASSERT(countBlocks(cap->r.rNursery->blocks) == cap->r.rNursery->n_blocks);
1519             
1520             // now update the nursery to point to the new block
1521             cap->r.rCurrentNursery = bd;
1522             
1523             // we might be unlucky and have another thread get on the
1524             // run queue before us and steal the large block, but in that
1525             // case the thread will just end up requesting another large
1526             // block.
1527             PUSH_ON_RUN_QUEUE(t);
1528             return rtsFalse;  /* not actually GC'ing */
1529         }
1530     }
1531     
1532     /* make all the running tasks block on a condition variable,
1533      * maybe set context_switch and wait till they all pile in,
1534      * then have them wait on a GC condition variable.
1535      */
1536     IF_DEBUG(scheduler,
1537              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1538                         (long)t->id, whatNext_strs[t->what_next]));
1539     threadPaused(t);
1540 #if defined(GRAN)
1541     ASSERT(!is_on_queue(t,CurrentProc));
1542 #elif defined(PARALLEL_HASKELL)
1543     /* Currently we emit a DESCHEDULE event before GC in GUM.
1544        ToDo: either add separate event to distinguish SYSTEM time from rest
1545        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1546     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1547         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1548                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1549         emitSchedule = rtsTrue;
1550     }
1551 #endif
1552       
1553     PUSH_ON_RUN_QUEUE(t);
1554     return rtsTrue;
1555     /* actual GC is done at the end of the while loop in schedule() */
1556 }
1557
1558 /* -----------------------------------------------------------------------------
1559  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1560  * ASSUMES: sched_mutex
1561  * -------------------------------------------------------------------------- */
1562
1563 static void
1564 scheduleHandleStackOverflow( StgTSO *t)
1565 {
1566     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1567                                   (long)t->id, whatNext_strs[t->what_next]));
1568     /* just adjust the stack for this thread, then pop it back
1569      * on the run queue.
1570      */
1571     threadPaused(t);
1572     { 
1573         /* enlarge the stack */
1574         StgTSO *new_t = threadStackOverflow(t);
1575         
1576         /* This TSO has moved, so update any pointers to it from the
1577          * main thread stack.  It better not be on any other queues...
1578          * (it shouldn't be).
1579          */
1580         if (t->main != NULL) {
1581             t->main->tso = new_t;
1582         }
1583         PUSH_ON_RUN_QUEUE(new_t);
1584     }
1585 }
1586
1587 /* -----------------------------------------------------------------------------
1588  * Handle a thread that returned to the scheduler with ThreadYielding
1589  * ASSUMES: sched_mutex
1590  * -------------------------------------------------------------------------- */
1591
1592 static rtsBool
1593 scheduleHandleYield( StgTSO *t, nat prev_what_next )
1594 {
1595     // Reset the context switch flag.  We don't do this just before
1596     // running the thread, because that would mean we would lose ticks
1597     // during GC, which can lead to unfair scheduling (a thread hogs
1598     // the CPU because the tick always arrives during GC).  This way
1599     // penalises threads that do a lot of allocation, but that seems
1600     // better than the alternative.
1601     context_switch = 0;
1602     
1603     /* put the thread back on the run queue.  Then, if we're ready to
1604      * GC, check whether this is the last task to stop.  If so, wake
1605      * up the GC thread.  getThread will block during a GC until the
1606      * GC is finished.
1607      */
1608     IF_DEBUG(scheduler,
1609              if (t->what_next != prev_what_next) {
1610                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1611                             (long)t->id, whatNext_strs[t->what_next]);
1612              } else {
1613                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1614                             (long)t->id, whatNext_strs[t->what_next]);
1615              }
1616         );
1617     
1618     IF_DEBUG(sanity,
1619              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1620              checkTSO(t));
1621     ASSERT(t->link == END_TSO_QUEUE);
1622     
1623     // Shortcut if we're just switching evaluators: don't bother
1624     // doing stack squeezing (which can be expensive), just run the
1625     // thread.
1626     if (t->what_next != prev_what_next) {
1627         return rtsTrue;
1628     }
1629     
1630     threadPaused(t);
1631     
1632 #if defined(GRAN)
1633     ASSERT(!is_on_queue(t,CurrentProc));
1634       
1635     IF_DEBUG(sanity,
1636              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1637              checkThreadQsSanity(rtsTrue));
1638
1639 #endif
1640
1641     addToRunQueue(t);
1642
1643 #if defined(GRAN)
1644     /* add a ContinueThread event to actually process the thread */
1645     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1646               ContinueThread,
1647               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1648     IF_GRAN_DEBUG(bq, 
1649                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1650                   G_EVENTQ(0);
1651                   G_CURR_THREADQ(0));
1652 #endif
1653     return rtsFalse;
1654 }
1655
1656 /* -----------------------------------------------------------------------------
1657  * Handle a thread that returned to the scheduler with ThreadBlocked
1658  * ASSUMES: sched_mutex
1659  * -------------------------------------------------------------------------- */
1660
1661 static void
1662 scheduleHandleThreadBlocked( StgTSO *t
1663 #if !defined(GRAN) && !defined(DEBUG)
1664     STG_UNUSED
1665 #endif
1666     )
1667 {
1668 #if defined(GRAN)
1669     IF_DEBUG(scheduler,
1670              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1671                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1672              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1673     
1674     // ??? needed; should emit block before
1675     IF_DEBUG(gran, 
1676              DumpGranEvent(GR_DESCHEDULE, t)); 
1677     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1678     /*
1679       ngoq Dogh!
1680       ASSERT(procStatus[CurrentProc]==Busy || 
1681       ((procStatus[CurrentProc]==Fetching) && 
1682       (t->block_info.closure!=(StgClosure*)NULL)));
1683       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1684       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1685       procStatus[CurrentProc]==Fetching)) 
1686       procStatus[CurrentProc] = Idle;
1687     */
1688 #elif defined(PAR)
1689     IF_DEBUG(scheduler,
1690              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1691                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1692     IF_PAR_DEBUG(bq,
1693                  
1694                  if (t->block_info.closure!=(StgClosure*)NULL) 
1695                  print_bq(t->block_info.closure));
1696     
1697     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1698     blockThread(t);
1699     
1700     /* whatever we schedule next, we must log that schedule */
1701     emitSchedule = rtsTrue;
1702     
1703 #else /* !GRAN */
1704       /* don't need to do anything.  Either the thread is blocked on
1705        * I/O, in which case we'll have called addToBlockedQueue
1706        * previously, or it's blocked on an MVar or Blackhole, in which
1707        * case it'll be on the relevant queue already.
1708        */
1709     ASSERT(t->why_blocked != NotBlocked);
1710     IF_DEBUG(scheduler,
1711              debugBelch("--<< thread %d (%s) stopped: ", 
1712                         t->id, whatNext_strs[t->what_next]);
1713              printThreadBlockage(t);
1714              debugBelch("\n"));
1715     
1716     /* Only for dumping event to log file 
1717        ToDo: do I need this in GranSim, too?
1718        blockThread(t);
1719     */
1720 #endif
1721 }
1722
1723 /* -----------------------------------------------------------------------------
1724  * Handle a thread that returned to the scheduler with ThreadFinished
1725  * ASSUMES: sched_mutex
1726  * -------------------------------------------------------------------------- */
1727
1728 static rtsBool
1729 scheduleHandleThreadFinished( StgMainThread *mainThread
1730                               USED_WHEN_RTS_SUPPORTS_THREADS,
1731                               Capability *cap,
1732                               StgTSO *t )
1733 {
1734     /* Need to check whether this was a main thread, and if so,
1735      * return with the return value.
1736      *
1737      * We also end up here if the thread kills itself with an
1738      * uncaught exception, see Exception.cmm.
1739      */
1740     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1741                                   t->id, whatNext_strs[t->what_next]));
1742
1743 #if defined(GRAN)
1744       endThread(t, CurrentProc); // clean-up the thread
1745 #elif defined(PARALLEL_HASKELL)
1746       /* For now all are advisory -- HWL */
1747       //if(t->priority==AdvisoryPriority) ??
1748       advisory_thread_count--; // JB: Caution with this counter, buggy!
1749       
1750 # if defined(DIST)
1751       if(t->dist.priority==RevalPriority)
1752         FinishReval(t);
1753 # endif
1754     
1755 # if defined(EDENOLD)
1756       // the thread could still have an outport... (BUG)
1757       if (t->eden.outport != -1) {
1758       // delete the outport for the tso which has finished...
1759         IF_PAR_DEBUG(eden_ports,
1760                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1761                               t->eden.outport, t->id));
1762         deleteOPT(t);
1763       }
1764       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1765       if (t->eden.epid != -1) {
1766         IF_PAR_DEBUG(eden_ports,
1767                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1768                            t->id, t->eden.epid));
1769         removeTSOfromProcess(t);
1770       }
1771 # endif 
1772
1773 # if defined(PAR)
1774       if (RtsFlags.ParFlags.ParStats.Full &&
1775           !RtsFlags.ParFlags.ParStats.Suppressed) 
1776         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1777
1778       //  t->par only contains statistics: left out for now...
1779       IF_PAR_DEBUG(fish,
1780                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1781                               t->id,t,t->par.sparkname));
1782 # endif
1783 #endif // PARALLEL_HASKELL
1784
1785       //
1786       // Check whether the thread that just completed was a main
1787       // thread, and if so return with the result.  
1788       //
1789       // There is an assumption here that all thread completion goes
1790       // through this point; we need to make sure that if a thread
1791       // ends up in the ThreadKilled state, that it stays on the run
1792       // queue so it can be dealt with here.
1793       //
1794       if (
1795 #if defined(RTS_SUPPORTS_THREADS)
1796           mainThread != NULL
1797 #else
1798           mainThread->tso == t
1799 #endif
1800           )
1801       {
1802           // We are a bound thread: this must be our thread that just
1803           // completed.
1804           ASSERT(mainThread->tso == t);
1805
1806           if (t->what_next == ThreadComplete) {
1807               if (mainThread->ret) {
1808                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1809                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1810               }
1811               mainThread->stat = Success;
1812           } else {
1813               if (mainThread->ret) {
1814                   *(mainThread->ret) = NULL;
1815               }
1816               if (interrupted) {
1817                   mainThread->stat = Interrupted;
1818               } else {
1819                   mainThread->stat = Killed;
1820               }
1821           }
1822 #ifdef DEBUG
1823           removeThreadLabel((StgWord)mainThread->tso->id);
1824 #endif
1825           if (mainThread->prev == NULL) {
1826               ASSERT(mainThread == main_threads);
1827               main_threads = mainThread->link;
1828           } else {
1829               mainThread->prev->link = mainThread->link;
1830           }
1831           if (mainThread->link != NULL) {
1832               mainThread->link->prev = mainThread->prev;
1833           }
1834           releaseCapability(cap);
1835           return rtsTrue; // tells schedule() to return
1836       }
1837
1838 #ifdef RTS_SUPPORTS_THREADS
1839       ASSERT(t->main == NULL);
1840 #else
1841       if (t->main != NULL) {
1842           // Must be a main thread that is not the topmost one.  Leave
1843           // it on the run queue until the stack has unwound to the
1844           // point where we can deal with this.  Leaving it on the run
1845           // queue also ensures that the garbage collector knows about
1846           // this thread and its return value (it gets dropped from the
1847           // all_threads list so there's no other way to find it).
1848           APPEND_TO_RUN_QUEUE(t);
1849       }
1850 #endif
1851       return rtsFalse;
1852 }
1853
1854 /* -----------------------------------------------------------------------------
1855  * Perform a heap census, if PROFILING
1856  * -------------------------------------------------------------------------- */
1857
1858 static rtsBool
1859 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1860 {
1861 #if defined(PROFILING)
1862     // When we have +RTS -i0 and we're heap profiling, do a census at
1863     // every GC.  This lets us get repeatable runs for debugging.
1864     if (performHeapProfile ||
1865         (RtsFlags.ProfFlags.profileInterval==0 &&
1866          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1867         GarbageCollect(GetRoots, rtsTrue);
1868         heapCensus();
1869         performHeapProfile = rtsFalse;
1870         return rtsTrue;  // true <=> we already GC'd
1871     }
1872 #endif
1873     return rtsFalse;
1874 }
1875
1876 /* -----------------------------------------------------------------------------
1877  * Perform a garbage collection if necessary
1878  * ASSUMES: sched_mutex
1879  * -------------------------------------------------------------------------- */
1880
1881 static void
1882 scheduleDoGC( Capability *cap STG_UNUSED )
1883 {
1884     StgTSO *t;
1885 #ifdef SMP
1886     static rtsBool waiting_for_gc;
1887     int n_capabilities = RtsFlags.ParFlags.nNodes - 1; 
1888            // subtract one because we're already holding one.
1889     Capability *caps[n_capabilities];
1890 #endif
1891
1892 #ifdef SMP
1893     // In order to GC, there must be no threads running Haskell code.
1894     // Therefore, the GC thread needs to hold *all* the capabilities,
1895     // and release them after the GC has completed.  
1896     //
1897     // This seems to be the simplest way: previous attempts involved
1898     // making all the threads with capabilities give up their
1899     // capabilities and sleep except for the *last* one, which
1900     // actually did the GC.  But it's quite hard to arrange for all
1901     // the other tasks to sleep and stay asleep.
1902     //
1903         
1904     // Someone else is already trying to GC
1905     if (waiting_for_gc) return;
1906     waiting_for_gc = rtsTrue;
1907
1908     caps[n_capabilities] = cap;
1909     while (n_capabilities > 0) {
1910         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
1911         waitForReturnCapability(&sched_mutex, &cap);
1912         n_capabilities--;
1913         caps[n_capabilities] = cap;
1914     }
1915
1916     waiting_for_gc = rtsFalse;
1917 #endif
1918
1919     /* Kick any transactions which are invalid back to their
1920      * atomically frames.  When next scheduled they will try to
1921      * commit, this commit will fail and they will retry.
1922      */
1923     for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1924         if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1925             if (!stmValidateTransaction (t -> trec)) {
1926                 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1927                 
1928                 // strip the stack back to the ATOMICALLY_FRAME, aborting
1929                 // the (nested) transaction, and saving the stack of any
1930                 // partially-evaluated thunks on the heap.
1931                 raiseAsync_(t, NULL, rtsTrue);
1932                 
1933 #ifdef REG_R1
1934                 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1935 #endif
1936             }
1937         }
1938     }
1939     
1940     // so this happens periodically:
1941     scheduleCheckBlackHoles();
1942     
1943     /* everybody back, start the GC.
1944      * Could do it in this thread, or signal a condition var
1945      * to do it in another thread.  Either way, we need to
1946      * broadcast on gc_pending_cond afterward.
1947      */
1948 #if defined(RTS_SUPPORTS_THREADS)
1949     IF_DEBUG(scheduler,sched_belch("doing GC"));
1950 #endif
1951     GarbageCollect(GetRoots,rtsFalse);
1952     
1953 #if defined(SMP)
1954     {
1955         // release our stash of capabilities.
1956         nat i;
1957         for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
1958             releaseCapability(caps[i]);
1959         }
1960     }
1961 #endif
1962
1963 #if defined(GRAN)
1964     /* add a ContinueThread event to continue execution of current thread */
1965     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1966               ContinueThread,
1967               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1968     IF_GRAN_DEBUG(bq, 
1969                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1970                   G_EVENTQ(0);
1971                   G_CURR_THREADQ(0));
1972 #endif /* GRAN */
1973 }
1974
1975 /* ---------------------------------------------------------------------------
1976  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1977  * used by Control.Concurrent for error checking.
1978  * ------------------------------------------------------------------------- */
1979  
1980 StgBool
1981 rtsSupportsBoundThreads(void)
1982 {
1983 #if defined(RTS_SUPPORTS_THREADS)
1984   return rtsTrue;
1985 #else
1986   return rtsFalse;
1987 #endif
1988 }
1989
1990 /* ---------------------------------------------------------------------------
1991  * isThreadBound(tso): check whether tso is bound to an OS thread.
1992  * ------------------------------------------------------------------------- */
1993  
1994 StgBool
1995 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1996 {
1997 #if defined(RTS_SUPPORTS_THREADS)
1998   return (tso->main != NULL);
1999 #endif
2000   return rtsFalse;
2001 }
2002
2003 /* ---------------------------------------------------------------------------
2004  * Singleton fork(). Do not copy any running threads.
2005  * ------------------------------------------------------------------------- */
2006
2007 #ifndef mingw32_HOST_OS
2008 #define FORKPROCESS_PRIMOP_SUPPORTED
2009 #endif
2010
2011 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2012 static void 
2013 deleteThreadImmediately(StgTSO *tso);
2014 #endif
2015 StgInt
2016 forkProcess(HsStablePtr *entry
2017 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2018             STG_UNUSED
2019 #endif
2020            )
2021 {
2022 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2023   pid_t pid;
2024   StgTSO* t,*next;
2025   StgMainThread *m;
2026   SchedulerStatus rc;
2027
2028   IF_DEBUG(scheduler,sched_belch("forking!"));
2029   rts_lock(); // This not only acquires sched_mutex, it also
2030               // makes sure that no other threads are running
2031
2032   pid = fork();
2033
2034   if (pid) { /* parent */
2035
2036   /* just return the pid */
2037     rts_unlock();
2038     return pid;
2039     
2040   } else { /* child */
2041     
2042     
2043       // delete all threads
2044     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
2045     
2046     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2047       next = t->link;
2048
2049         // don't allow threads to catch the ThreadKilled exception
2050       deleteThreadImmediately(t);
2051     }
2052     
2053       // wipe the main thread list
2054     while((m = main_threads) != NULL) {
2055       main_threads = m->link;
2056 # ifdef THREADED_RTS
2057       closeCondition(&m->bound_thread_cond);
2058 # endif
2059       stgFree(m);
2060     }
2061     
2062     rc = rts_evalStableIO(entry, NULL);  // run the action
2063     rts_checkSchedStatus("forkProcess",rc);
2064     
2065     rts_unlock();
2066     
2067     hs_exit();                      // clean up and exit
2068     stg_exit(0);
2069   }
2070 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2071   barf("forkProcess#: primop not supported, sorry!\n");
2072   return -1;
2073 #endif
2074 }
2075
2076 /* ---------------------------------------------------------------------------
2077  * deleteAllThreads():  kill all the live threads.
2078  *
2079  * This is used when we catch a user interrupt (^C), before performing
2080  * any necessary cleanups and running finalizers.
2081  *
2082  * Locks: sched_mutex held.
2083  * ------------------------------------------------------------------------- */
2084    
2085 void
2086 deleteAllThreads ( void )
2087 {
2088   StgTSO* t, *next;
2089   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2090   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2091       if (t->what_next == ThreadRelocated) {
2092           next = t->link;
2093       } else {
2094           next = t->global_link;
2095           deleteThread(t);
2096       }
2097   }      
2098
2099   // The run queue now contains a bunch of ThreadKilled threads.  We
2100   // must not throw these away: the main thread(s) will be in there
2101   // somewhere, and the main scheduler loop has to deal with it.
2102   // Also, the run queue is the only thing keeping these threads from
2103   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2104
2105   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2106   ASSERT(blackhole_queue == END_TSO_QUEUE);
2107   ASSERT(sleeping_queue == END_TSO_QUEUE);
2108 }
2109
2110 /* startThread and  insertThread are now in GranSim.c -- HWL */
2111
2112
2113 /* ---------------------------------------------------------------------------
2114  * Suspending & resuming Haskell threads.
2115  * 
2116  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2117  * its capability before calling the C function.  This allows another
2118  * task to pick up the capability and carry on running Haskell
2119  * threads.  It also means that if the C call blocks, it won't lock
2120  * the whole system.
2121  *
2122  * The Haskell thread making the C call is put to sleep for the
2123  * duration of the call, on the susepended_ccalling_threads queue.  We
2124  * give out a token to the task, which it can use to resume the thread
2125  * on return from the C function.
2126  * ------------------------------------------------------------------------- */
2127    
2128 StgInt
2129 suspendThread( StgRegTable *reg )
2130 {
2131   nat tok;
2132   Capability *cap;
2133   int saved_errno = errno;
2134
2135   /* assume that *reg is a pointer to the StgRegTable part
2136    * of a Capability.
2137    */
2138   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
2139
2140   ACQUIRE_LOCK(&sched_mutex);
2141
2142   IF_DEBUG(scheduler,
2143            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
2144
2145   // XXX this might not be necessary --SDM
2146   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
2147
2148   threadPaused(cap->r.rCurrentTSO);
2149   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
2150   suspended_ccalling_threads = cap->r.rCurrentTSO;
2151
2152   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
2153       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
2154       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
2155   } else {
2156       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
2157   }
2158
2159   /* Use the thread ID as the token; it should be unique */
2160   tok = cap->r.rCurrentTSO->id;
2161
2162   /* Hand back capability */
2163   cap->r.rInHaskell = rtsFalse;
2164   releaseCapability(cap);
2165   
2166 #if defined(RTS_SUPPORTS_THREADS)
2167   /* Preparing to leave the RTS, so ensure there's a native thread/task
2168      waiting to take over.
2169   */
2170   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
2171 #endif
2172
2173   RELEASE_LOCK(&sched_mutex);
2174   
2175   errno = saved_errno;
2176   return tok; 
2177 }
2178
2179 StgRegTable *
2180 resumeThread( StgInt tok )
2181 {
2182   StgTSO *tso, **prev;
2183   Capability *cap;
2184   int saved_errno = errno;
2185
2186 #if defined(RTS_SUPPORTS_THREADS)
2187   /* Wait for permission to re-enter the RTS with the result. */
2188   ACQUIRE_LOCK(&sched_mutex);
2189   waitForReturnCapability(&sched_mutex, &cap);
2190
2191   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
2192 #else
2193   grabCapability(&cap);
2194 #endif
2195
2196   /* Remove the thread off of the suspended list */
2197   prev = &suspended_ccalling_threads;
2198   for (tso = suspended_ccalling_threads; 
2199        tso != END_TSO_QUEUE; 
2200        prev = &tso->link, tso = tso->link) {
2201     if (tso->id == (StgThreadID)tok) {
2202       *prev = tso->link;
2203       break;
2204     }
2205   }
2206   if (tso == END_TSO_QUEUE) {
2207     barf("resumeThread: thread not found");
2208   }
2209   tso->link = END_TSO_QUEUE;
2210   
2211   if(tso->why_blocked == BlockedOnCCall) {
2212       awakenBlockedQueueNoLock(tso->blocked_exceptions);
2213       tso->blocked_exceptions = NULL;
2214   }
2215   
2216   /* Reset blocking status */
2217   tso->why_blocked  = NotBlocked;
2218
2219   cap->r.rCurrentTSO = tso;
2220   cap->r.rInHaskell = rtsTrue;
2221   RELEASE_LOCK(&sched_mutex);
2222   errno = saved_errno;
2223   return &cap->r;
2224 }
2225
2226 /* ---------------------------------------------------------------------------
2227  * Comparing Thread ids.
2228  *
2229  * This is used from STG land in the implementation of the
2230  * instances of Eq/Ord for ThreadIds.
2231  * ------------------------------------------------------------------------ */
2232
2233 int
2234 cmp_thread(StgPtr tso1, StgPtr tso2) 
2235
2236   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2237   StgThreadID id2 = ((StgTSO *)tso2)->id;
2238  
2239   if (id1 < id2) return (-1);
2240   if (id1 > id2) return 1;
2241   return 0;
2242 }
2243
2244 /* ---------------------------------------------------------------------------
2245  * Fetching the ThreadID from an StgTSO.
2246  *
2247  * This is used in the implementation of Show for ThreadIds.
2248  * ------------------------------------------------------------------------ */
2249 int
2250 rts_getThreadId(StgPtr tso) 
2251 {
2252   return ((StgTSO *)tso)->id;
2253 }
2254
2255 #ifdef DEBUG
2256 void
2257 labelThread(StgPtr tso, char *label)
2258 {
2259   int len;
2260   void *buf;
2261
2262   /* Caveat: Once set, you can only set the thread name to "" */
2263   len = strlen(label)+1;
2264   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2265   strncpy(buf,label,len);
2266   /* Update will free the old memory for us */
2267   updateThreadLabel(((StgTSO *)tso)->id,buf);
2268 }
2269 #endif /* DEBUG */
2270
2271 /* ---------------------------------------------------------------------------
2272    Create a new thread.
2273
2274    The new thread starts with the given stack size.  Before the
2275    scheduler can run, however, this thread needs to have a closure
2276    (and possibly some arguments) pushed on its stack.  See
2277    pushClosure() in Schedule.h.
2278
2279    createGenThread() and createIOThread() (in SchedAPI.h) are
2280    convenient packaged versions of this function.
2281
2282    currently pri (priority) is only used in a GRAN setup -- HWL
2283    ------------------------------------------------------------------------ */
2284 #if defined(GRAN)
2285 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2286 StgTSO *
2287 createThread(nat size, StgInt pri)
2288 #else
2289 StgTSO *
2290 createThread(nat size)
2291 #endif
2292 {
2293
2294     StgTSO *tso;
2295     nat stack_size;
2296
2297     /* First check whether we should create a thread at all */
2298 #if defined(PARALLEL_HASKELL)
2299   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2300   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2301     threadsIgnored++;
2302     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2303           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2304     return END_TSO_QUEUE;
2305   }
2306   threadsCreated++;
2307 #endif
2308
2309 #if defined(GRAN)
2310   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2311 #endif
2312
2313   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2314
2315   /* catch ridiculously small stack sizes */
2316   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2317     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2318   }
2319
2320   stack_size = size - TSO_STRUCT_SIZEW;
2321
2322   tso = (StgTSO *)allocate(size);
2323   TICK_ALLOC_TSO(stack_size, 0);
2324
2325   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2326 #if defined(GRAN)
2327   SET_GRAN_HDR(tso, ThisPE);
2328 #endif
2329
2330   // Always start with the compiled code evaluator
2331   tso->what_next = ThreadRunGHC;
2332
2333   tso->id = next_thread_id++; 
2334   tso->why_blocked  = NotBlocked;
2335   tso->blocked_exceptions = NULL;
2336
2337   tso->saved_errno = 0;
2338   tso->main = NULL;
2339   
2340   tso->stack_size   = stack_size;
2341   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2342                               - TSO_STRUCT_SIZEW;
2343   tso->sp           = (P_)&(tso->stack) + stack_size;
2344
2345   tso->trec = NO_TREC;
2346
2347 #ifdef PROFILING
2348   tso->prof.CCCS = CCS_MAIN;
2349 #endif
2350
2351   /* put a stop frame on the stack */
2352   tso->sp -= sizeofW(StgStopFrame);
2353   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2354   tso->link = END_TSO_QUEUE;
2355
2356   // ToDo: check this
2357 #if defined(GRAN)
2358   /* uses more flexible routine in GranSim */
2359   insertThread(tso, CurrentProc);
2360 #else
2361   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2362    * from its creation
2363    */
2364 #endif
2365
2366 #if defined(GRAN) 
2367   if (RtsFlags.GranFlags.GranSimStats.Full) 
2368     DumpGranEvent(GR_START,tso);
2369 #elif defined(PARALLEL_HASKELL)
2370   if (RtsFlags.ParFlags.ParStats.Full) 
2371     DumpGranEvent(GR_STARTQ,tso);
2372   /* HACk to avoid SCHEDULE 
2373      LastTSO = tso; */
2374 #endif
2375
2376   /* Link the new thread on the global thread list.
2377    */
2378   tso->global_link = all_threads;
2379   all_threads = tso;
2380
2381 #if defined(DIST)
2382   tso->dist.priority = MandatoryPriority; //by default that is...
2383 #endif
2384
2385 #if defined(GRAN)
2386   tso->gran.pri = pri;
2387 # if defined(DEBUG)
2388   tso->gran.magic = TSO_MAGIC; // debugging only
2389 # endif
2390   tso->gran.sparkname   = 0;
2391   tso->gran.startedat   = CURRENT_TIME; 
2392   tso->gran.exported    = 0;
2393   tso->gran.basicblocks = 0;
2394   tso->gran.allocs      = 0;
2395   tso->gran.exectime    = 0;
2396   tso->gran.fetchtime   = 0;
2397   tso->gran.fetchcount  = 0;
2398   tso->gran.blocktime   = 0;
2399   tso->gran.blockcount  = 0;
2400   tso->gran.blockedat   = 0;
2401   tso->gran.globalsparks = 0;
2402   tso->gran.localsparks  = 0;
2403   if (RtsFlags.GranFlags.Light)
2404     tso->gran.clock  = Now; /* local clock */
2405   else
2406     tso->gran.clock  = 0;
2407
2408   IF_DEBUG(gran,printTSO(tso));
2409 #elif defined(PARALLEL_HASKELL)
2410 # if defined(DEBUG)
2411   tso->par.magic = TSO_MAGIC; // debugging only
2412 # endif
2413   tso->par.sparkname   = 0;
2414   tso->par.startedat   = CURRENT_TIME; 
2415   tso->par.exported    = 0;
2416   tso->par.basicblocks = 0;
2417   tso->par.allocs      = 0;
2418   tso->par.exectime    = 0;
2419   tso->par.fetchtime   = 0;
2420   tso->par.fetchcount  = 0;
2421   tso->par.blocktime   = 0;
2422   tso->par.blockcount  = 0;
2423   tso->par.blockedat   = 0;
2424   tso->par.globalsparks = 0;
2425   tso->par.localsparks  = 0;
2426 #endif
2427
2428 #if defined(GRAN)
2429   globalGranStats.tot_threads_created++;
2430   globalGranStats.threads_created_on_PE[CurrentProc]++;
2431   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2432   globalGranStats.tot_sq_probes++;
2433 #elif defined(PARALLEL_HASKELL)
2434   // collect parallel global statistics (currently done together with GC stats)
2435   if (RtsFlags.ParFlags.ParStats.Global &&
2436       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2437     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2438     globalParStats.tot_threads_created++;
2439   }
2440 #endif 
2441
2442 #if defined(GRAN)
2443   IF_GRAN_DEBUG(pri,
2444                 sched_belch("==__ schedule: Created TSO %d (%p);",
2445                       CurrentProc, tso, tso->id));
2446 #elif defined(PARALLEL_HASKELL)
2447   IF_PAR_DEBUG(verbose,
2448                sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2449                            (long)tso->id, tso, advisory_thread_count));
2450 #else
2451   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2452                                  (long)tso->id, (long)tso->stack_size));
2453 #endif    
2454   return tso;
2455 }
2456
2457 #if defined(PAR)
2458 /* RFP:
2459    all parallel thread creation calls should fall through the following routine.
2460 */
2461 StgTSO *
2462 createThreadFromSpark(rtsSpark spark) 
2463 { StgTSO *tso;
2464   ASSERT(spark != (rtsSpark)NULL);
2465 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2466   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2467   { threadsIgnored++;
2468     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2469           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2470     return END_TSO_QUEUE;
2471   }
2472   else
2473   { threadsCreated++;
2474     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2475     if (tso==END_TSO_QUEUE)     
2476       barf("createSparkThread: Cannot create TSO");
2477 #if defined(DIST)
2478     tso->priority = AdvisoryPriority;
2479 #endif
2480     pushClosure(tso,spark);
2481     addToRunQueue(tso);
2482     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2483   }
2484   return tso;
2485 }
2486 #endif
2487
2488 /*
2489   Turn a spark into a thread.
2490   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2491 */
2492 #if 0
2493 StgTSO *
2494 activateSpark (rtsSpark spark) 
2495 {
2496   StgTSO *tso;
2497
2498   tso = createSparkThread(spark);
2499   if (RtsFlags.ParFlags.ParStats.Full) {   
2500     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2501       IF_PAR_DEBUG(verbose,
2502                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2503                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2504   }
2505   // ToDo: fwd info on local/global spark to thread -- HWL
2506   // tso->gran.exported =  spark->exported;
2507   // tso->gran.locked =   !spark->global;
2508   // tso->gran.sparkname = spark->name;
2509
2510   return tso;
2511 }
2512 #endif
2513
2514 /* ---------------------------------------------------------------------------
2515  * scheduleThread()
2516  *
2517  * scheduleThread puts a thread on the head of the runnable queue.
2518  * This will usually be done immediately after a thread is created.
2519  * The caller of scheduleThread must create the thread using e.g.
2520  * createThread and push an appropriate closure
2521  * on this thread's stack before the scheduler is invoked.
2522  * ------------------------------------------------------------------------ */
2523
2524 static void
2525 scheduleThread_(StgTSO *tso)
2526 {
2527   // The thread goes at the *end* of the run-queue, to avoid possible
2528   // starvation of any threads already on the queue.
2529   APPEND_TO_RUN_QUEUE(tso);
2530   threadRunnable();
2531 }
2532
2533 void
2534 scheduleThread(StgTSO* tso)
2535 {
2536   ACQUIRE_LOCK(&sched_mutex);
2537   scheduleThread_(tso);
2538   RELEASE_LOCK(&sched_mutex);
2539 }
2540
2541 #if defined(RTS_SUPPORTS_THREADS)
2542 static Condition bound_cond_cache;
2543 static int bound_cond_cache_full = 0;
2544 #endif
2545
2546
2547 SchedulerStatus
2548 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
2549                    Capability *initialCapability)
2550 {
2551     // Precondition: sched_mutex must be held
2552     StgMainThread *m;
2553
2554     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2555     m->tso = tso;
2556     tso->main = m;
2557     m->ret = ret;
2558     m->stat = NoStatus;
2559     m->link = main_threads;
2560     m->prev = NULL;
2561     if (main_threads != NULL) {
2562         main_threads->prev = m;
2563     }
2564     main_threads = m;
2565
2566 #if defined(RTS_SUPPORTS_THREADS)
2567     // Allocating a new condition for each thread is expensive, so we
2568     // cache one.  This is a pretty feeble hack, but it helps speed up
2569     // consecutive call-ins quite a bit.
2570     if (bound_cond_cache_full) {
2571         m->bound_thread_cond = bound_cond_cache;
2572         bound_cond_cache_full = 0;
2573     } else {
2574         initCondition(&m->bound_thread_cond);
2575     }
2576 #endif
2577
2578     /* Put the thread on the main-threads list prior to scheduling the TSO.
2579        Failure to do so introduces a race condition in the MT case (as
2580        identified by Wolfgang Thaller), whereby the new task/OS thread 
2581        created by scheduleThread_() would complete prior to the thread
2582        that spawned it managed to put 'itself' on the main-threads list.
2583        The upshot of it all being that the worker thread wouldn't get to
2584        signal the completion of the its work item for the main thread to
2585        see (==> it got stuck waiting.)    -- sof 6/02.
2586     */
2587     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2588     
2589     APPEND_TO_RUN_QUEUE(tso);
2590     // NB. Don't call threadRunnable() here, because the thread is
2591     // bound and only runnable by *this* OS thread, so waking up other
2592     // workers will just slow things down.
2593
2594     return waitThread_(m, initialCapability);
2595 }
2596
2597 /* ---------------------------------------------------------------------------
2598  * initScheduler()
2599  *
2600  * Initialise the scheduler.  This resets all the queues - if the
2601  * queues contained any threads, they'll be garbage collected at the
2602  * next pass.
2603  *
2604  * ------------------------------------------------------------------------ */
2605
2606 void 
2607 initScheduler(void)
2608 {
2609 #if defined(GRAN)
2610   nat i;
2611
2612   for (i=0; i<=MAX_PROC; i++) {
2613     run_queue_hds[i]      = END_TSO_QUEUE;
2614     run_queue_tls[i]      = END_TSO_QUEUE;
2615     blocked_queue_hds[i]  = END_TSO_QUEUE;
2616     blocked_queue_tls[i]  = END_TSO_QUEUE;
2617     ccalling_threadss[i]  = END_TSO_QUEUE;
2618     blackhole_queue[i]    = END_TSO_QUEUE;
2619     sleeping_queue        = END_TSO_QUEUE;
2620   }
2621 #else
2622   run_queue_hd      = END_TSO_QUEUE;
2623   run_queue_tl      = END_TSO_QUEUE;
2624   blocked_queue_hd  = END_TSO_QUEUE;
2625   blocked_queue_tl  = END_TSO_QUEUE;
2626   blackhole_queue   = END_TSO_QUEUE;
2627   sleeping_queue    = END_TSO_QUEUE;
2628 #endif 
2629
2630   suspended_ccalling_threads  = END_TSO_QUEUE;
2631
2632   main_threads = NULL;
2633   all_threads  = END_TSO_QUEUE;
2634
2635   context_switch = 0;
2636   interrupted    = 0;
2637
2638   RtsFlags.ConcFlags.ctxtSwitchTicks =
2639       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2640       
2641 #if defined(RTS_SUPPORTS_THREADS)
2642   /* Initialise the mutex and condition variables used by
2643    * the scheduler. */
2644   initMutex(&sched_mutex);
2645   initMutex(&term_mutex);
2646 #endif
2647   
2648   ACQUIRE_LOCK(&sched_mutex);
2649
2650   /* A capability holds the state a native thread needs in
2651    * order to execute STG code. At least one capability is
2652    * floating around (only SMP builds have more than one).
2653    */
2654   initCapabilities();
2655   
2656 #if defined(RTS_SUPPORTS_THREADS)
2657   initTaskManager();
2658 #endif
2659
2660 #if defined(SMP)
2661   /* eagerly start some extra workers */
2662   startingWorkerThread = RtsFlags.ParFlags.nNodes;
2663   startTasks(RtsFlags.ParFlags.nNodes, taskStart);
2664 #endif
2665
2666 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2667   initSparkPools();
2668 #endif
2669
2670   RELEASE_LOCK(&sched_mutex);
2671 }
2672
2673 void
2674 exitScheduler( void )
2675 {
2676     interrupted = rtsTrue;
2677     shutting_down_scheduler = rtsTrue;
2678 #if defined(RTS_SUPPORTS_THREADS)
2679     if (threadIsTask(osThreadId())) { taskStop(); }
2680     stopTaskManager();
2681 #endif
2682 }
2683
2684 /* ----------------------------------------------------------------------------
2685    Managing the per-task allocation areas.
2686    
2687    Each capability comes with an allocation area.  These are
2688    fixed-length block lists into which allocation can be done.
2689
2690    ToDo: no support for two-space collection at the moment???
2691    ------------------------------------------------------------------------- */
2692
2693 static SchedulerStatus
2694 waitThread_(StgMainThread* m, Capability *initialCapability)
2695 {
2696   SchedulerStatus stat;
2697
2698   // Precondition: sched_mutex must be held.
2699   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2700
2701 #if defined(GRAN)
2702   /* GranSim specific init */
2703   CurrentTSO = m->tso;                // the TSO to run
2704   procStatus[MainProc] = Busy;        // status of main PE
2705   CurrentProc = MainProc;             // PE to run it on
2706   schedule(m,initialCapability);
2707 #else
2708   schedule(m,initialCapability);
2709   ASSERT(m->stat != NoStatus);
2710 #endif
2711
2712   stat = m->stat;
2713
2714 #if defined(RTS_SUPPORTS_THREADS)
2715   // Free the condition variable, returning it to the cache if possible.
2716   if (!bound_cond_cache_full) {
2717       bound_cond_cache = m->bound_thread_cond;
2718       bound_cond_cache_full = 1;
2719   } else {
2720       closeCondition(&m->bound_thread_cond);
2721   }
2722 #endif
2723
2724   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2725   stgFree(m);
2726
2727   // Postcondition: sched_mutex still held
2728   return stat;
2729 }
2730
2731 /* ---------------------------------------------------------------------------
2732    Where are the roots that we know about?
2733
2734         - all the threads on the runnable queue
2735         - all the threads on the blocked queue
2736         - all the threads on the sleeping queue
2737         - all the thread currently executing a _ccall_GC
2738         - all the "main threads"
2739      
2740    ------------------------------------------------------------------------ */
2741
2742 /* This has to be protected either by the scheduler monitor, or by the
2743         garbage collection monitor (probably the latter).
2744         KH @ 25/10/99
2745 */
2746
2747 void
2748 GetRoots( evac_fn evac )
2749 {
2750 #if defined(GRAN)
2751   {
2752     nat i;
2753     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2754       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2755           evac((StgClosure **)&run_queue_hds[i]);
2756       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2757           evac((StgClosure **)&run_queue_tls[i]);
2758       
2759       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2760           evac((StgClosure **)&blocked_queue_hds[i]);
2761       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2762           evac((StgClosure **)&blocked_queue_tls[i]);
2763       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2764           evac((StgClosure **)&ccalling_threads[i]);
2765     }
2766   }
2767
2768   markEventQueue();
2769
2770 #else /* !GRAN */
2771   if (run_queue_hd != END_TSO_QUEUE) {
2772       ASSERT(run_queue_tl != END_TSO_QUEUE);
2773       evac((StgClosure **)&run_queue_hd);
2774       evac((StgClosure **)&run_queue_tl);
2775   }
2776   
2777   if (blocked_queue_hd != END_TSO_QUEUE) {
2778       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2779       evac((StgClosure **)&blocked_queue_hd);
2780       evac((StgClosure **)&blocked_queue_tl);
2781   }
2782   
2783   if (sleeping_queue != END_TSO_QUEUE) {
2784       evac((StgClosure **)&sleeping_queue);
2785   }
2786 #endif 
2787
2788   if (blackhole_queue != END_TSO_QUEUE) {
2789       evac((StgClosure **)&blackhole_queue);
2790   }
2791
2792   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2793       evac((StgClosure **)&suspended_ccalling_threads);
2794   }
2795
2796 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2797   markSparkQueue(evac);
2798 #endif
2799
2800 #if defined(RTS_USER_SIGNALS)
2801   // mark the signal handlers (signals should be already blocked)
2802   markSignalHandlers(evac);
2803 #endif
2804 }
2805
2806 /* -----------------------------------------------------------------------------
2807    performGC
2808
2809    This is the interface to the garbage collector from Haskell land.
2810    We provide this so that external C code can allocate and garbage
2811    collect when called from Haskell via _ccall_GC.
2812
2813    It might be useful to provide an interface whereby the programmer
2814    can specify more roots (ToDo).
2815    
2816    This needs to be protected by the GC condition variable above.  KH.
2817    -------------------------------------------------------------------------- */
2818
2819 static void (*extra_roots)(evac_fn);
2820
2821 void
2822 performGC(void)
2823 {
2824   /* Obligated to hold this lock upon entry */
2825   ACQUIRE_LOCK(&sched_mutex);
2826   GarbageCollect(GetRoots,rtsFalse);
2827   RELEASE_LOCK(&sched_mutex);
2828 }
2829
2830 void
2831 performMajorGC(void)
2832 {
2833   ACQUIRE_LOCK(&sched_mutex);
2834   GarbageCollect(GetRoots,rtsTrue);
2835   RELEASE_LOCK(&sched_mutex);
2836 }
2837
2838 static void
2839 AllRoots(evac_fn evac)
2840 {
2841     GetRoots(evac);             // the scheduler's roots
2842     extra_roots(evac);          // the user's roots
2843 }
2844
2845 void
2846 performGCWithRoots(void (*get_roots)(evac_fn))
2847 {
2848   ACQUIRE_LOCK(&sched_mutex);
2849   extra_roots = get_roots;
2850   GarbageCollect(AllRoots,rtsFalse);
2851   RELEASE_LOCK(&sched_mutex);
2852 }
2853
2854 /* -----------------------------------------------------------------------------
2855    Stack overflow
2856
2857    If the thread has reached its maximum stack size, then raise the
2858    StackOverflow exception in the offending thread.  Otherwise
2859    relocate the TSO into a larger chunk of memory and adjust its stack
2860    size appropriately.
2861    -------------------------------------------------------------------------- */
2862
2863 static StgTSO *
2864 threadStackOverflow(StgTSO *tso)
2865 {
2866   nat new_stack_size, stack_words;
2867   lnat new_tso_size;
2868   StgPtr new_sp;
2869   StgTSO *dest;
2870
2871   IF_DEBUG(sanity,checkTSO(tso));
2872   if (tso->stack_size >= tso->max_stack_size) {
2873
2874     IF_DEBUG(gc,
2875              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2876                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2877              /* If we're debugging, just print out the top of the stack */
2878              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2879                                               tso->sp+64)));
2880
2881     /* Send this thread the StackOverflow exception */
2882     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2883     return tso;
2884   }
2885
2886   /* Try to double the current stack size.  If that takes us over the
2887    * maximum stack size for this thread, then use the maximum instead.
2888    * Finally round up so the TSO ends up as a whole number of blocks.
2889    */
2890   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2891   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2892                                        TSO_STRUCT_SIZE)/sizeof(W_);
2893   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2894   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2895
2896   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2897
2898   dest = (StgTSO *)allocate(new_tso_size);
2899   TICK_ALLOC_TSO(new_stack_size,0);
2900
2901   /* copy the TSO block and the old stack into the new area */
2902   memcpy(dest,tso,TSO_STRUCT_SIZE);
2903   stack_words = tso->stack + tso->stack_size - tso->sp;
2904   new_sp = (P_)dest + new_tso_size - stack_words;
2905   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2906
2907   /* relocate the stack pointers... */
2908   dest->sp         = new_sp;
2909   dest->stack_size = new_stack_size;
2910         
2911   /* Mark the old TSO as relocated.  We have to check for relocated
2912    * TSOs in the garbage collector and any primops that deal with TSOs.
2913    *
2914    * It's important to set the sp value to just beyond the end
2915    * of the stack, so we don't attempt to scavenge any part of the
2916    * dead TSO's stack.
2917    */
2918   tso->what_next = ThreadRelocated;
2919   tso->link = dest;
2920   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2921   tso->why_blocked = NotBlocked;
2922
2923   IF_PAR_DEBUG(verbose,
2924                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2925                      tso->id, tso, tso->stack_size);
2926                /* If we're debugging, just print out the top of the stack */
2927                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2928                                                 tso->sp+64)));
2929   
2930   IF_DEBUG(sanity,checkTSO(tso));
2931 #if 0
2932   IF_DEBUG(scheduler,printTSO(dest));
2933 #endif
2934
2935   return dest;
2936 }
2937
2938 /* ---------------------------------------------------------------------------
2939    Wake up a queue that was blocked on some resource.
2940    ------------------------------------------------------------------------ */
2941
2942 #if defined(GRAN)
2943 STATIC_INLINE void
2944 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2945 {
2946 }
2947 #elif defined(PARALLEL_HASKELL)
2948 STATIC_INLINE void
2949 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2950 {
2951   /* write RESUME events to log file and
2952      update blocked and fetch time (depending on type of the orig closure) */
2953   if (RtsFlags.ParFlags.ParStats.Full) {
2954     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2955                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2956                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2957     if (EMPTY_RUN_QUEUE())
2958       emitSchedule = rtsTrue;
2959
2960     switch (get_itbl(node)->type) {
2961         case FETCH_ME_BQ:
2962           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2963           break;
2964         case RBH:
2965         case FETCH_ME:
2966         case BLACKHOLE_BQ:
2967           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2968           break;
2969 #ifdef DIST
2970         case MVAR:
2971           break;
2972 #endif    
2973         default:
2974           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2975         }
2976       }
2977 }
2978 #endif
2979
2980 #if defined(GRAN)
2981 static StgBlockingQueueElement *
2982 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2983 {
2984     StgTSO *tso;
2985     PEs node_loc, tso_loc;
2986
2987     node_loc = where_is(node); // should be lifted out of loop
2988     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2989     tso_loc = where_is((StgClosure *)tso);
2990     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2991       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2992       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2993       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2994       // insertThread(tso, node_loc);
2995       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2996                 ResumeThread,
2997                 tso, node, (rtsSpark*)NULL);
2998       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2999       // len_local++;
3000       // len++;
3001     } else { // TSO is remote (actually should be FMBQ)
3002       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3003                                   RtsFlags.GranFlags.Costs.gunblocktime +
3004                                   RtsFlags.GranFlags.Costs.latency;
3005       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3006                 UnblockThread,
3007                 tso, node, (rtsSpark*)NULL);
3008       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3009       // len++;
3010     }
3011     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3012     IF_GRAN_DEBUG(bq,
3013                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3014                           (node_loc==tso_loc ? "Local" : "Global"), 
3015                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3016     tso->block_info.closure = NULL;
3017     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3018                              tso->id, tso));
3019 }
3020 #elif defined(PARALLEL_HASKELL)
3021 static StgBlockingQueueElement *
3022 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
3023 {
3024     StgBlockingQueueElement *next;
3025
3026     switch (get_itbl(bqe)->type) {
3027     case TSO:
3028       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3029       /* if it's a TSO just push it onto the run_queue */
3030       next = bqe->link;
3031       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3032       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3033       threadRunnable();
3034       unblockCount(bqe, node);
3035       /* reset blocking status after dumping event */
3036       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3037       break;
3038
3039     case BLOCKED_FETCH:
3040       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3041       next = bqe->link;
3042       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3043       PendingFetches = (StgBlockedFetch *)bqe;
3044       break;
3045
3046 # if defined(DEBUG)
3047       /* can ignore this case in a non-debugging setup; 
3048          see comments on RBHSave closures above */
3049     case CONSTR:
3050       /* check that the closure is an RBHSave closure */
3051       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3052              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3053              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3054       break;
3055
3056     default:
3057       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3058            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3059            (StgClosure *)bqe);
3060 # endif
3061     }
3062   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3063   return next;
3064 }
3065
3066 #else /* !GRAN && !PARALLEL_HASKELL */
3067 static StgTSO *
3068 unblockOneLocked(StgTSO *tso)
3069 {
3070   StgTSO *next;
3071
3072   ASSERT(get_itbl(tso)->type == TSO);
3073   ASSERT(tso->why_blocked != NotBlocked);
3074   tso->why_blocked = NotBlocked;
3075   next = tso->link;
3076   tso->link = END_TSO_QUEUE;
3077   APPEND_TO_RUN_QUEUE(tso);
3078   threadRunnable();
3079   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3080   return next;
3081 }
3082 #endif
3083
3084 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3085 INLINE_ME StgBlockingQueueElement *
3086 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3087 {
3088   ACQUIRE_LOCK(&sched_mutex);
3089   bqe = unblockOneLocked(bqe, node);
3090   RELEASE_LOCK(&sched_mutex);
3091   return bqe;
3092 }
3093 #else
3094 INLINE_ME StgTSO *
3095 unblockOne(StgTSO *tso)
3096 {
3097   ACQUIRE_LOCK(&sched_mutex);
3098   tso = unblockOneLocked(tso);
3099   RELEASE_LOCK(&sched_mutex);
3100   return tso;
3101 }
3102 #endif
3103
3104 #if defined(GRAN)
3105 void 
3106 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3107 {
3108   StgBlockingQueueElement *bqe;
3109   PEs node_loc;
3110   nat len = 0; 
3111
3112   IF_GRAN_DEBUG(bq, 
3113                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3114                       node, CurrentProc, CurrentTime[CurrentProc], 
3115                       CurrentTSO->id, CurrentTSO));
3116
3117   node_loc = where_is(node);
3118
3119   ASSERT(q == END_BQ_QUEUE ||
3120          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3121          get_itbl(q)->type == CONSTR); // closure (type constructor)
3122   ASSERT(is_unique(node));
3123
3124   /* FAKE FETCH: magically copy the node to the tso's proc;
3125      no Fetch necessary because in reality the node should not have been 
3126      moved to the other PE in the first place
3127   */
3128   if (CurrentProc!=node_loc) {
3129     IF_GRAN_DEBUG(bq, 
3130                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3131                         node, node_loc, CurrentProc, CurrentTSO->id, 
3132                         // CurrentTSO, where_is(CurrentTSO),
3133                         node->header.gran.procs));
3134     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3135     IF_GRAN_DEBUG(bq, 
3136                   debugBelch("## new bitmask of node %p is %#x\n",
3137                         node, node->header.gran.procs));
3138     if (RtsFlags.GranFlags.GranSimStats.Global) {
3139       globalGranStats.tot_fake_fetches++;
3140     }
3141   }
3142
3143   bqe = q;
3144   // ToDo: check: ASSERT(CurrentProc==node_loc);
3145   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3146     //next = bqe->link;
3147     /* 
3148        bqe points to the current element in the queue
3149        next points to the next element in the queue
3150     */
3151     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3152     //tso_loc = where_is(tso);
3153     len++;
3154     bqe = unblockOneLocked(bqe, node);
3155   }
3156
3157   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3158      the closure to make room for the anchor of the BQ */
3159   if (bqe!=END_BQ_QUEUE) {
3160     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3161     /*
3162     ASSERT((info_ptr==&RBH_Save_0_info) ||
3163            (info_ptr==&RBH_Save_1_info) ||
3164            (info_ptr==&RBH_Save_2_info));
3165     */
3166     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3167     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3168     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3169
3170     IF_GRAN_DEBUG(bq,
3171                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3172                         node, info_type(node)));
3173   }
3174
3175   /* statistics gathering */
3176   if (RtsFlags.GranFlags.GranSimStats.Global) {
3177     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3178     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3179     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3180     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3181   }
3182   IF_GRAN_DEBUG(bq,
3183                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3184                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3185 }
3186 #elif defined(PARALLEL_HASKELL)
3187 void 
3188 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3189 {
3190   StgBlockingQueueElement *bqe;
3191
3192   ACQUIRE_LOCK(&sched_mutex);
3193
3194   IF_PAR_DEBUG(verbose, 
3195                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3196                      node, mytid));
3197 #ifdef DIST  
3198   //RFP
3199   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3200     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3201     return;
3202   }
3203 #endif
3204   
3205   ASSERT(q == END_BQ_QUEUE ||
3206          get_itbl(q)->type == TSO ||           
3207          get_itbl(q)->type == BLOCKED_FETCH || 
3208          get_itbl(q)->type == CONSTR); 
3209
3210   bqe = q;
3211   while (get_itbl(bqe)->type==TSO || 
3212          get_itbl(bqe)->type==BLOCKED_FETCH) {
3213     bqe = unblockOneLocked(bqe, node);
3214   }
3215   RELEASE_LOCK(&sched_mutex);
3216 }
3217
3218 #else   /* !GRAN && !PARALLEL_HASKELL */
3219
3220 void
3221 awakenBlockedQueueNoLock(StgTSO *tso)
3222 {
3223   while (tso != END_TSO_QUEUE) {
3224     tso = unblockOneLocked(tso);
3225   }
3226 }
3227
3228 void
3229 awakenBlockedQueue(StgTSO *tso)
3230 {
3231   ACQUIRE_LOCK(&sched_mutex);
3232   while (tso != END_TSO_QUEUE) {
3233     tso = unblockOneLocked(tso);
3234   }
3235   RELEASE_LOCK(&sched_mutex);
3236 }
3237 #endif
3238
3239 /* ---------------------------------------------------------------------------
3240    Interrupt execution
3241    - usually called inside a signal handler so it mustn't do anything fancy.   
3242    ------------------------------------------------------------------------ */
3243
3244 void
3245 interruptStgRts(void)
3246 {
3247     interrupted    = 1;
3248     context_switch = 1;
3249     threadRunnable();
3250     /* ToDo: if invoked from a signal handler, this threadRunnable
3251      * only works if there's another thread (not this one) waiting to
3252      * be woken up.
3253      */
3254 }
3255
3256 /* -----------------------------------------------------------------------------
3257    Unblock a thread
3258
3259    This is for use when we raise an exception in another thread, which
3260    may be blocked.
3261    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3262    -------------------------------------------------------------------------- */
3263
3264 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3265 /*
3266   NB: only the type of the blocking queue is different in GranSim and GUM
3267       the operations on the queue-elements are the same
3268       long live polymorphism!
3269
3270   Locks: sched_mutex is held upon entry and exit.
3271
3272 */
3273 static void
3274 unblockThread(StgTSO *tso)
3275 {
3276   StgBlockingQueueElement *t, **last;
3277
3278   switch (tso->why_blocked) {
3279
3280   case NotBlocked:
3281     return;  /* not blocked */
3282
3283   case BlockedOnSTM:
3284     // Be careful: nothing to do here!  We tell the scheduler that the thread
3285     // is runnable and we leave it to the stack-walking code to abort the 
3286     // transaction while unwinding the stack.  We should perhaps have a debugging
3287     // test to make sure that this really happens and that the 'zombie' transaction
3288     // does not get committed.
3289     goto done;
3290
3291   case BlockedOnMVar:
3292     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3293     {
3294       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3295       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3296
3297       last = (StgBlockingQueueElement **)&mvar->head;
3298       for (t = (StgBlockingQueueElement *)mvar->head; 
3299            t != END_BQ_QUEUE; 
3300            last = &t->link, last_tso = t, t = t->link) {
3301         if (t == (StgBlockingQueueElement *)tso) {
3302           *last = (StgBlockingQueueElement *)tso->link;
3303           if (mvar->tail == tso) {
3304             mvar->tail = (StgTSO *)last_tso;
3305           }
3306           goto done;
3307         }
3308       }
3309       barf("unblockThread (MVAR): TSO not found");
3310     }
3311
3312   case BlockedOnBlackHole:
3313     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3314     {
3315       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3316
3317       last = &bq->blocking_queue;
3318       for (t = bq->blocking_queue; 
3319            t != END_BQ_QUEUE; 
3320            last = &t->link, t = t->link) {
3321         if (t == (StgBlockingQueueElement *)tso) {
3322           *last = (StgBlockingQueueElement *)tso->link;
3323           goto done;
3324         }
3325       }
3326       barf("unblockThread (BLACKHOLE): TSO not found");
3327     }
3328
3329   case BlockedOnException:
3330     {
3331       StgTSO *target  = tso->block_info.tso;
3332
3333       ASSERT(get_itbl(target)->type == TSO);
3334
3335       if (target->what_next == ThreadRelocated) {
3336           target = target->link;
3337           ASSERT(get_itbl(target)->type == TSO);
3338       }
3339
3340       ASSERT(target->blocked_exceptions != NULL);
3341
3342       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3343       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3344            t != END_BQ_QUEUE; 
3345            last = &t->link, t = t->link) {
3346         ASSERT(get_itbl(t)->type == TSO);
3347         if (t == (StgBlockingQueueElement *)tso) {
3348           *last = (StgBlockingQueueElement *)tso->link;
3349           goto done;
3350         }
3351       }
3352       barf("unblockThread (Exception): TSO not found");
3353     }
3354
3355   case BlockedOnRead:
3356   case BlockedOnWrite:
3357 #if defined(mingw32_HOST_OS)
3358   case BlockedOnDoProc:
3359 #endif
3360     {
3361       /* take TSO off blocked_queue */
3362       StgBlockingQueueElement *prev = NULL;
3363       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3364            prev = t, t = t->link) {
3365         if (t == (StgBlockingQueueElement *)tso) {
3366           if (prev == NULL) {
3367             blocked_queue_hd = (StgTSO *)t->link;
3368             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3369               blocked_queue_tl = END_TSO_QUEUE;
3370             }
3371           } else {
3372             prev->link = t->link;
3373             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3374               blocked_queue_tl = (StgTSO *)prev;
3375             }
3376           }
3377 #if defined(mingw32_HOST_OS)
3378           /* (Cooperatively) signal that the worker thread should abort
3379            * the request.
3380            */
3381           abandonWorkRequest(tso->block_info.async_result->reqID);
3382 #endif
3383           goto done;
3384         }
3385       }
3386       barf("unblockThread (I/O): TSO not found");
3387     }
3388
3389   case BlockedOnDelay:
3390     {
3391       /* take TSO off sleeping_queue */
3392       StgBlockingQueueElement *prev = NULL;
3393       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3394            prev = t, t = t->link) {
3395         if (t == (StgBlockingQueueElement *)tso) {
3396           if (prev == NULL) {
3397             sleeping_queue = (StgTSO *)t->link;
3398           } else {
3399             prev->link = t->link;
3400           }
3401           goto done;
3402         }
3403       }
3404       barf("unblockThread (delay): TSO not found");
3405     }
3406
3407   default:
3408     barf("unblockThread");
3409   }
3410
3411  done:
3412   tso->link = END_TSO_QUEUE;
3413   tso->why_blocked = NotBlocked;
3414   tso->block_info.closure = NULL;
3415   PUSH_ON_RUN_QUEUE(tso);
3416 }
3417 #else
3418 static void
3419 unblockThread(StgTSO *tso)
3420 {
3421   StgTSO *t, **last;
3422   
3423   /* To avoid locking unnecessarily. */
3424   if (tso->why_blocked == NotBlocked) {
3425     return;
3426   }
3427
3428   switch (tso->why_blocked) {
3429
3430   case BlockedOnSTM:
3431     // Be careful: nothing to do here!  We tell the scheduler that the thread
3432     // is runnable and we leave it to the stack-walking code to abort the 
3433     // transaction while unwinding the stack.  We should perhaps have a debugging
3434     // test to make sure that this really happens and that the 'zombie' transaction
3435     // does not get committed.
3436     goto done;
3437
3438   case BlockedOnMVar:
3439     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3440     {
3441       StgTSO *last_tso = END_TSO_QUEUE;
3442       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3443
3444       last = &mvar->head;
3445       for (t = mvar->head; t != END_TSO_QUEUE; 
3446            last = &t->link, last_tso = t, t = t->link) {
3447         if (t == tso) {
3448           *last = tso->link;
3449           if (mvar->tail == tso) {
3450             mvar->tail = last_tso;
3451           }
3452           goto done;
3453         }
3454       }
3455       barf("unblockThread (MVAR): TSO not found");
3456     }
3457
3458   case BlockedOnBlackHole:
3459     {
3460       last = &blackhole_queue;
3461       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3462            last = &t->link, t = t->link) {
3463         if (t == tso) {
3464           *last = tso->link;
3465           goto done;
3466         }
3467       }
3468       barf("unblockThread (BLACKHOLE): TSO not found");
3469     }
3470
3471   case BlockedOnException:
3472     {
3473       StgTSO *target  = tso->block_info.tso;
3474
3475       ASSERT(get_itbl(target)->type == TSO);
3476
3477       while (target->what_next == ThreadRelocated) {
3478           target = target->link;
3479           ASSERT(get_itbl(target)->type == TSO);
3480       }
3481       
3482       ASSERT(target->blocked_exceptions != NULL);
3483
3484       last = &target->blocked_exceptions;
3485       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3486            last = &t->link, t = t->link) {
3487         ASSERT(get_itbl(t)->type == TSO);
3488         if (t == tso) {
3489           *last = tso->link;
3490           goto done;
3491         }
3492       }
3493       barf("unblockThread (Exception): TSO not found");
3494     }
3495
3496   case BlockedOnRead:
3497   case BlockedOnWrite:
3498 #if defined(mingw32_HOST_OS)
3499   case BlockedOnDoProc:
3500 #endif
3501     {
3502       StgTSO *prev = NULL;
3503       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3504            prev = t, t = t->link) {
3505         if (t == tso) {
3506           if (prev == NULL) {
3507             blocked_queue_hd = t->link;
3508             if (blocked_queue_tl == t) {
3509               blocked_queue_tl = END_TSO_QUEUE;
3510             }
3511           } else {
3512             prev->link = t->link;
3513             if (blocked_queue_tl == t) {
3514               blocked_queue_tl = prev;
3515             }
3516           }
3517 #if defined(mingw32_HOST_OS)
3518           /* (Cooperatively) signal that the worker thread should abort
3519            * the request.
3520            */
3521           abandonWorkRequest(tso->block_info.async_result->reqID);
3522 #endif
3523           goto done;
3524         }
3525       }
3526       barf("unblockThread (I/O): TSO not found");
3527     }
3528
3529   case BlockedOnDelay:
3530     {
3531       StgTSO *prev = NULL;
3532       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3533            prev = t, t = t->link) {
3534         if (t == tso) {
3535           if (prev == NULL) {
3536             sleeping_queue = t->link;
3537           } else {
3538             prev->link = t->link;
3539           }
3540           goto done;
3541         }
3542       }
3543       barf("unblockThread (delay): TSO not found");
3544     }
3545
3546   default:
3547     barf("unblockThread");
3548   }
3549
3550  done:
3551   tso->link = END_TSO_QUEUE;
3552   tso->why_blocked = NotBlocked;
3553   tso->block_info.closure = NULL;
3554   APPEND_TO_RUN_QUEUE(tso);
3555 }
3556 #endif
3557
3558 /* -----------------------------------------------------------------------------
3559  * checkBlackHoles()
3560  *
3561  * Check the blackhole_queue for threads that can be woken up.  We do
3562  * this periodically: before every GC, and whenever the run queue is
3563  * empty.
3564  *
3565  * An elegant solution might be to just wake up all the blocked
3566  * threads with awakenBlockedQueue occasionally: they'll go back to
3567  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3568  * doesn't give us a way to tell whether we've actually managed to
3569  * wake up any threads, so we would be busy-waiting.
3570  *
3571  * -------------------------------------------------------------------------- */
3572
3573 static rtsBool
3574 checkBlackHoles( void )
3575 {
3576     StgTSO **prev, *t;
3577     rtsBool any_woke_up = rtsFalse;
3578     StgHalfWord type;
3579
3580     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3581
3582     // ASSUMES: sched_mutex
3583     prev = &blackhole_queue;
3584     t = blackhole_queue;
3585     while (t != END_TSO_QUEUE) {
3586         ASSERT(t->why_blocked == BlockedOnBlackHole);
3587         type = get_itbl(t->block_info.closure)->type;
3588         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3589             t = unblockOneLocked(t);
3590             *prev = t;
3591             any_woke_up = rtsTrue;
3592         } else {
3593             prev = &t->link;
3594             t = t->link;
3595         }
3596     }
3597
3598     return any_woke_up;
3599 }
3600
3601 /* -----------------------------------------------------------------------------
3602  * raiseAsync()
3603  *
3604  * The following function implements the magic for raising an
3605  * asynchronous exception in an existing thread.
3606  *
3607  * We first remove the thread from any queue on which it might be
3608  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3609  *
3610  * We strip the stack down to the innermost CATCH_FRAME, building
3611  * thunks in the heap for all the active computations, so they can 
3612  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3613  * an application of the handler to the exception, and push it on
3614  * the top of the stack.
3615  * 
3616  * How exactly do we save all the active computations?  We create an
3617  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3618  * AP_STACKs pushes everything from the corresponding update frame
3619  * upwards onto the stack.  (Actually, it pushes everything up to the
3620  * next update frame plus a pointer to the next AP_STACK object.
3621  * Entering the next AP_STACK object pushes more onto the stack until we
3622  * reach the last AP_STACK object - at which point the stack should look
3623  * exactly as it did when we killed the TSO and we can continue
3624  * execution by entering the closure on top of the stack.
3625  *
3626  * We can also kill a thread entirely - this happens if either (a) the 
3627  * exception passed to raiseAsync is NULL, or (b) there's no
3628  * CATCH_FRAME on the stack.  In either case, we strip the entire
3629  * stack and replace the thread with a zombie.
3630  *
3631  * Locks: sched_mutex held upon entry nor exit.
3632  *
3633  * -------------------------------------------------------------------------- */
3634  
3635 void 
3636 deleteThread(StgTSO *tso)
3637 {
3638   if (tso->why_blocked != BlockedOnCCall &&
3639       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3640       raiseAsync(tso,NULL);
3641   }
3642 }
3643
3644 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3645 static void 
3646 deleteThreadImmediately(StgTSO *tso)
3647 { // for forkProcess only:
3648   // delete thread without giving it a chance to catch the KillThread exception
3649
3650   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3651       return;
3652   }
3653
3654   if (tso->why_blocked != BlockedOnCCall &&
3655       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3656     unblockThread(tso);
3657   }
3658
3659   tso->what_next = ThreadKilled;
3660 }
3661 #endif
3662
3663 void
3664 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3665 {
3666   /* When raising async exs from contexts where sched_mutex isn't held;
3667      use raiseAsyncWithLock(). */
3668   ACQUIRE_LOCK(&sched_mutex);
3669   raiseAsync(tso,exception);
3670   RELEASE_LOCK(&sched_mutex);
3671 }
3672
3673 void
3674 raiseAsync(StgTSO *tso, StgClosure *exception)
3675 {
3676     raiseAsync_(tso, exception, rtsFalse);
3677 }
3678
3679 static void
3680 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3681 {
3682     StgRetInfoTable *info;
3683     StgPtr sp;
3684   
3685     // Thread already dead?
3686     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3687         return;
3688     }
3689
3690     IF_DEBUG(scheduler, 
3691              sched_belch("raising exception in thread %ld.", (long)tso->id));
3692     
3693     // Remove it from any blocking queues
3694     unblockThread(tso);
3695
3696     sp = tso->sp;
3697     
3698     // The stack freezing code assumes there's a closure pointer on
3699     // the top of the stack, so we have to arrange that this is the case...
3700     //
3701     if (sp[0] == (W_)&stg_enter_info) {
3702         sp++;
3703     } else {
3704         sp--;
3705         sp[0] = (W_)&stg_dummy_ret_closure;
3706     }
3707
3708     while (1) {
3709         nat i;
3710
3711         // 1. Let the top of the stack be the "current closure"
3712         //
3713         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3714         // CATCH_FRAME.
3715         //
3716         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3717         // current closure applied to the chunk of stack up to (but not
3718         // including) the update frame.  This closure becomes the "current
3719         // closure".  Go back to step 2.
3720         //
3721         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3722         // top of the stack applied to the exception.
3723         // 
3724         // 5. If it's a STOP_FRAME, then kill the thread.
3725         // 
3726         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3727         // transaction
3728        
3729         
3730         StgPtr frame;
3731         
3732         frame = sp + 1;
3733         info = get_ret_itbl((StgClosure *)frame);
3734         
3735         while (info->i.type != UPDATE_FRAME
3736                && (info->i.type != CATCH_FRAME || exception == NULL)
3737                && info->i.type != STOP_FRAME
3738                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3739         {
3740             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3741               // IF we find an ATOMICALLY_FRAME then we abort the
3742               // current transaction and propagate the exception.  In
3743               // this case (unlike ordinary exceptions) we do not care
3744               // whether the transaction is valid or not because its
3745               // possible validity cannot have caused the exception
3746               // and will not be visible after the abort.
3747               IF_DEBUG(stm,
3748                        debugBelch("Found atomically block delivering async exception\n"));
3749               stmAbortTransaction(tso -> trec);
3750               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3751             }
3752             frame += stack_frame_sizeW((StgClosure *)frame);
3753             info = get_ret_itbl((StgClosure *)frame);
3754         }
3755         
3756         switch (info->i.type) {
3757             
3758         case ATOMICALLY_FRAME:
3759             ASSERT(stop_at_atomically);
3760             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3761             stmCondemnTransaction(tso -> trec);
3762 #ifdef REG_R1
3763             tso->sp = frame;
3764 #else
3765             // R1 is not a register: the return convention for IO in
3766             // this case puts the return value on the stack, so we
3767             // need to set up the stack to return to the atomically
3768             // frame properly...
3769             tso->sp = frame - 2;
3770             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3771             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3772 #endif
3773             tso->what_next = ThreadRunGHC;
3774             return;
3775
3776         case CATCH_FRAME:
3777             // If we find a CATCH_FRAME, and we've got an exception to raise,
3778             // then build the THUNK raise(exception), and leave it on
3779             // top of the CATCH_FRAME ready to enter.
3780             //
3781         {
3782 #ifdef PROFILING
3783             StgCatchFrame *cf = (StgCatchFrame *)frame;
3784 #endif
3785             StgThunk *raise;
3786             
3787             // we've got an exception to raise, so let's pass it to the
3788             // handler in this frame.
3789             //
3790             raise = (StgThunk *)allocate(sizeofW(StgThunk)+1);
3791             TICK_ALLOC_SE_THK(1,0);
3792             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3793             raise->payload[0] = exception;
3794             
3795             // throw away the stack from Sp up to the CATCH_FRAME.
3796             //
3797             sp = frame - 1;
3798             
3799             /* Ensure that async excpetions are blocked now, so we don't get
3800              * a surprise exception before we get around to executing the
3801              * handler.
3802              */
3803             if (tso->blocked_exceptions == NULL) {
3804                 tso->blocked_exceptions = END_TSO_QUEUE;
3805             }
3806             
3807             /* Put the newly-built THUNK on top of the stack, ready to execute
3808              * when the thread restarts.
3809              */
3810             sp[0] = (W_)raise;
3811             sp[-1] = (W_)&stg_enter_info;
3812             tso->sp = sp-1;
3813             tso->what_next = ThreadRunGHC;
3814             IF_DEBUG(sanity, checkTSO(tso));
3815             return;
3816         }
3817         
3818         case UPDATE_FRAME:
3819         {
3820             StgAP_STACK * ap;
3821             nat words;
3822             
3823             // First build an AP_STACK consisting of the stack chunk above the
3824             // current update frame, with the top word on the stack as the
3825             // fun field.
3826             //
3827             words = frame - sp - 1;
3828             ap = (StgAP_STACK *)allocate(AP_STACK_sizeW(words));
3829             
3830             ap->size = words;
3831             ap->fun  = (StgClosure *)sp[0];
3832             sp++;
3833             for(i=0; i < (nat)words; ++i) {
3834                 ap->payload[i] = (StgClosure *)*sp++;
3835             }
3836             
3837             SET_HDR(ap,&stg_AP_STACK_info,
3838                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3839             TICK_ALLOC_UP_THK(words+1,0);
3840             
3841             IF_DEBUG(scheduler,
3842                      debugBelch("sched: Updating ");
3843                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3844                      debugBelch(" with ");
3845                      printObj((StgClosure *)ap);
3846                 );
3847
3848             // Replace the updatee with an indirection - happily
3849             // this will also wake up any threads currently
3850             // waiting on the result.
3851             //
3852             // Warning: if we're in a loop, more than one update frame on
3853             // the stack may point to the same object.  Be careful not to
3854             // overwrite an IND_OLDGEN in this case, because we'll screw
3855             // up the mutable lists.  To be on the safe side, don't
3856             // overwrite any kind of indirection at all.  See also
3857             // threadSqueezeStack in GC.c, where we have to make a similar
3858             // check.
3859             //
3860             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3861                 // revert the black hole
3862                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3863                                (StgClosure *)ap);
3864             }
3865             sp += sizeofW(StgUpdateFrame) - 1;
3866             sp[0] = (W_)ap; // push onto stack
3867             break;
3868         }
3869         
3870         case STOP_FRAME:
3871             // We've stripped the entire stack, the thread is now dead.
3872             sp += sizeofW(StgStopFrame);
3873             tso->what_next = ThreadKilled;
3874             tso->sp = sp;
3875             return;
3876             
3877         default:
3878             barf("raiseAsync");
3879         }
3880     }
3881     barf("raiseAsync");
3882 }
3883
3884 /* -----------------------------------------------------------------------------
3885    raiseExceptionHelper
3886    
3887    This function is called by the raise# primitve, just so that we can
3888    move some of the tricky bits of raising an exception from C-- into
3889    C.  Who knows, it might be a useful re-useable thing here too.
3890    -------------------------------------------------------------------------- */
3891
3892 StgWord
3893 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3894 {
3895     StgThunk *raise_closure = NULL;
3896     StgPtr p, next;
3897     StgRetInfoTable *info;
3898     //
3899     // This closure represents the expression 'raise# E' where E
3900     // is the exception raise.  It is used to overwrite all the
3901     // thunks which are currently under evaluataion.
3902     //
3903
3904     //    
3905     // LDV profiling: stg_raise_info has THUNK as its closure
3906     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3907     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3908     // 1 does not cause any problem unless profiling is performed.
3909     // However, when LDV profiling goes on, we need to linearly scan
3910     // small object pool, where raise_closure is stored, so we should
3911     // use MIN_UPD_SIZE.
3912     //
3913     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3914     //                                 sizeofW(StgClosure)+1);
3915     //
3916
3917     //
3918     // Walk up the stack, looking for the catch frame.  On the way,
3919     // we update any closures pointed to from update frames with the
3920     // raise closure that we just built.
3921     //
3922     p = tso->sp;
3923     while(1) {
3924         info = get_ret_itbl((StgClosure *)p);
3925         next = p + stack_frame_sizeW((StgClosure *)p);
3926         switch (info->i.type) {
3927             
3928         case UPDATE_FRAME:
3929             // Only create raise_closure if we need to.
3930             if (raise_closure == NULL) {
3931                 raise_closure = 
3932                     (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE);
3933                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3934                 raise_closure->payload[0] = exception;
3935             }
3936             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3937             p = next;
3938             continue;
3939
3940         case ATOMICALLY_FRAME:
3941             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3942             tso->sp = p;
3943             return ATOMICALLY_FRAME;
3944             
3945         case CATCH_FRAME:
3946             tso->sp = p;
3947             return CATCH_FRAME;
3948
3949         case CATCH_STM_FRAME:
3950             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3951             tso->sp = p;
3952             return CATCH_STM_FRAME;
3953             
3954         case STOP_FRAME:
3955             tso->sp = p;
3956             return STOP_FRAME;
3957
3958         case CATCH_RETRY_FRAME:
3959         default:
3960             p = next; 
3961             continue;
3962         }
3963     }
3964 }
3965
3966
3967 /* -----------------------------------------------------------------------------
3968    findRetryFrameHelper
3969
3970    This function is called by the retry# primitive.  It traverses the stack
3971    leaving tso->sp referring to the frame which should handle the retry.  
3972
3973    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3974    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3975
3976    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3977    despite the similar implementation.
3978
3979    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3980    not be created within memory transactions.
3981    -------------------------------------------------------------------------- */
3982
3983 StgWord
3984 findRetryFrameHelper (StgTSO *tso)
3985 {
3986   StgPtr           p, next;
3987   StgRetInfoTable *info;
3988
3989   p = tso -> sp;
3990   while (1) {
3991     info = get_ret_itbl((StgClosure *)p);
3992     next = p + stack_frame_sizeW((StgClosure *)p);
3993     switch (info->i.type) {
3994       
3995     case ATOMICALLY_FRAME:
3996       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3997       tso->sp = p;
3998       return ATOMICALLY_FRAME;
3999       
4000     case CATCH_RETRY_FRAME:
4001       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4002       tso->sp = p;
4003       return CATCH_RETRY_FRAME;
4004       
4005     case CATCH_STM_FRAME:
4006     default:
4007       ASSERT(info->i.type != CATCH_FRAME);
4008       ASSERT(info->i.type != STOP_FRAME);
4009       p = next; 
4010       continue;
4011     }
4012   }
4013 }
4014
4015 /* -----------------------------------------------------------------------------
4016    resurrectThreads is called after garbage collection on the list of
4017    threads found to be garbage.  Each of these threads will be woken
4018    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4019    on an MVar, or NonTermination if the thread was blocked on a Black
4020    Hole.
4021
4022    Locks: sched_mutex isn't held upon entry nor exit.
4023    -------------------------------------------------------------------------- */
4024
4025 void
4026 resurrectThreads( StgTSO *threads )
4027 {
4028   StgTSO *tso, *next;
4029
4030   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4031     next = tso->global_link;
4032     tso->global_link = all_threads;
4033     all_threads = tso;
4034     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4035
4036     switch (tso->why_blocked) {
4037     case BlockedOnMVar:
4038     case BlockedOnException:
4039       /* Called by GC - sched_mutex lock is currently held. */
4040       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
4041       break;
4042     case BlockedOnBlackHole:
4043       raiseAsync(tso,(StgClosure *)NonTermination_closure);
4044       break;
4045     case BlockedOnSTM:
4046       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
4047       break;
4048     case NotBlocked:
4049       /* This might happen if the thread was blocked on a black hole
4050        * belonging to a thread that we've just woken up (raiseAsync
4051        * can wake up threads, remember...).
4052        */
4053       continue;
4054     default:
4055       barf("resurrectThreads: thread blocked in a strange way");
4056     }
4057   }
4058 }
4059
4060 /* ----------------------------------------------------------------------------
4061  * Debugging: why is a thread blocked
4062  * [Also provides useful information when debugging threaded programs
4063  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4064    ------------------------------------------------------------------------- */
4065
4066 static void
4067 printThreadBlockage(StgTSO *tso)
4068 {
4069   switch (tso->why_blocked) {
4070   case BlockedOnRead:
4071     debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
4072     break;
4073   case BlockedOnWrite:
4074     debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
4075     break;
4076 #if defined(mingw32_HOST_OS)
4077     case BlockedOnDoProc:
4078     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4079     break;
4080 #endif
4081   case BlockedOnDelay:
4082     debugBelch("is blocked until %ld", tso->block_info.target);
4083     break;
4084   case BlockedOnMVar:
4085     debugBelch("is blocked on an MVar");
4086     break;
4087   case BlockedOnException:
4088     debugBelch("is blocked on delivering an exception to thread %d",
4089             tso->block_info.tso->id);
4090     break;
4091   case BlockedOnBlackHole:
4092     debugBelch("is blocked on a black hole");
4093     break;
4094   case NotBlocked:
4095     debugBelch("is not blocked");
4096     break;
4097 #if defined(PARALLEL_HASKELL)
4098   case BlockedOnGA:
4099     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4100             tso->block_info.closure, info_type(tso->block_info.closure));
4101     break;
4102   case BlockedOnGA_NoSend:
4103     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4104             tso->block_info.closure, info_type(tso->block_info.closure));
4105     break;
4106 #endif
4107   case BlockedOnCCall:
4108     debugBelch("is blocked on an external call");
4109     break;
4110   case BlockedOnCCall_NoUnblockExc:
4111     debugBelch("is blocked on an external call (exceptions were already blocked)");
4112     break;
4113   case BlockedOnSTM:
4114     debugBelch("is blocked on an STM operation");
4115     break;
4116   default:
4117     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4118          tso->why_blocked, tso->id, tso);
4119   }
4120 }
4121
4122 static void
4123 printThreadStatus(StgTSO *tso)
4124 {
4125   switch (tso->what_next) {
4126   case ThreadKilled:
4127     debugBelch("has been killed");
4128     break;
4129   case ThreadComplete:
4130     debugBelch("has completed");
4131     break;
4132   default:
4133     printThreadBlockage(tso);
4134   }
4135 }
4136
4137 void
4138 printAllThreads(void)
4139 {
4140   StgTSO *t;
4141
4142 # if defined(GRAN)
4143   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4144   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4145                        time_string, rtsFalse/*no commas!*/);
4146
4147   debugBelch("all threads at [%s]:\n", time_string);
4148 # elif defined(PARALLEL_HASKELL)
4149   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4150   ullong_format_string(CURRENT_TIME,
4151                        time_string, rtsFalse/*no commas!*/);
4152
4153   debugBelch("all threads at [%s]:\n", time_string);
4154 # else
4155   debugBelch("all threads:\n");
4156 # endif
4157
4158   for (t = all_threads; t != END_TSO_QUEUE; ) {
4159     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
4160 #if defined(DEBUG)
4161     {
4162       void *label = lookupThreadLabel(t->id);
4163       if (label) debugBelch("[\"%s\"] ",(char *)label);
4164     }
4165 #endif
4166     if (t->what_next == ThreadRelocated) {
4167         debugBelch("has been relocated...\n");
4168         t = t->link;
4169     } else {
4170         printThreadStatus(t);
4171         debugBelch("\n");
4172         t = t->global_link;
4173     }
4174   }
4175 }
4176     
4177 #ifdef DEBUG
4178
4179 /* 
4180    Print a whole blocking queue attached to node (debugging only).
4181 */
4182 # if defined(PARALLEL_HASKELL)
4183 void 
4184 print_bq (StgClosure *node)
4185 {
4186   StgBlockingQueueElement *bqe;
4187   StgTSO *tso;
4188   rtsBool end;
4189
4190   debugBelch("## BQ of closure %p (%s): ",
4191           node, info_type(node));
4192
4193   /* should cover all closures that may have a blocking queue */
4194   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4195          get_itbl(node)->type == FETCH_ME_BQ ||
4196          get_itbl(node)->type == RBH ||
4197          get_itbl(node)->type == MVAR);
4198     
4199   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4200
4201   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4202 }
4203
4204 /* 
4205    Print a whole blocking queue starting with the element bqe.
4206 */
4207 void 
4208 print_bqe (StgBlockingQueueElement *bqe)
4209 {
4210   rtsBool end;
4211
4212   /* 
4213      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4214   */
4215   for (end = (bqe==END_BQ_QUEUE);
4216        !end; // iterate until bqe points to a CONSTR
4217        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4218        bqe = end ? END_BQ_QUEUE : bqe->link) {
4219     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4220     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4221     /* types of closures that may appear in a blocking queue */
4222     ASSERT(get_itbl(bqe)->type == TSO ||           
4223            get_itbl(bqe)->type == BLOCKED_FETCH || 
4224            get_itbl(bqe)->type == CONSTR); 
4225     /* only BQs of an RBH end with an RBH_Save closure */
4226     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4227
4228     switch (get_itbl(bqe)->type) {
4229     case TSO:
4230       debugBelch(" TSO %u (%x),",
4231               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4232       break;
4233     case BLOCKED_FETCH:
4234       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4235               ((StgBlockedFetch *)bqe)->node, 
4236               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4237               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4238               ((StgBlockedFetch *)bqe)->ga.weight);
4239       break;
4240     case CONSTR:
4241       debugBelch(" %s (IP %p),",
4242               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4243                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4244                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4245                "RBH_Save_?"), get_itbl(bqe));
4246       break;
4247     default:
4248       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4249            info_type((StgClosure *)bqe)); // , node, info_type(node));
4250       break;
4251     }
4252   } /* for */
4253   debugBelch("\n");
4254 }
4255 # elif defined(GRAN)
4256 void 
4257 print_bq (StgClosure *node)
4258 {
4259   StgBlockingQueueElement *bqe;
4260   PEs node_loc, tso_loc;
4261   rtsBool end;
4262
4263   /* should cover all closures that may have a blocking queue */
4264   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4265          get_itbl(node)->type == FETCH_ME_BQ ||
4266          get_itbl(node)->type == RBH);
4267     
4268   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4269   node_loc = where_is(node);
4270
4271   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4272           node, info_type(node), node_loc);
4273
4274   /* 
4275      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4276   */
4277   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4278        !end; // iterate until bqe points to a CONSTR
4279        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4280     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4281     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4282     /* types of closures that may appear in a blocking queue */
4283     ASSERT(get_itbl(bqe)->type == TSO ||           
4284            get_itbl(bqe)->type == CONSTR); 
4285     /* only BQs of an RBH end with an RBH_Save closure */
4286     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4287
4288     tso_loc = where_is((StgClosure *)bqe);
4289     switch (get_itbl(bqe)->type) {
4290     case TSO:
4291       debugBelch(" TSO %d (%p) on [PE %d],",
4292               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4293       break;
4294     case CONSTR:
4295       debugBelch(" %s (IP %p),",
4296               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4297                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4298                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4299                "RBH_Save_?"), get_itbl(bqe));
4300       break;
4301     default:
4302       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4303            info_type((StgClosure *)bqe), node, info_type(node));
4304       break;
4305     }
4306   } /* for */
4307   debugBelch("\n");
4308 }
4309 # endif
4310
4311 #if defined(PARALLEL_HASKELL)
4312 static nat
4313 run_queue_len(void)
4314 {
4315   nat i;
4316   StgTSO *tso;
4317
4318   for (i=0, tso=run_queue_hd; 
4319        tso != END_TSO_QUEUE;
4320        i++, tso=tso->link)
4321     /* nothing */
4322
4323   return i;
4324 }
4325 #endif
4326
4327 void
4328 sched_belch(char *s, ...)
4329 {
4330   va_list ap;
4331   va_start(ap,s);
4332 #ifdef RTS_SUPPORTS_THREADS
4333   debugBelch("sched (task %p): ", osThreadId());
4334 #elif defined(PARALLEL_HASKELL)
4335   debugBelch("== ");
4336 #else
4337   debugBelch("sched: ");
4338 #endif
4339   vdebugBelch(s, ap);
4340   debugBelch("\n");
4341   va_end(ap);
4342 }
4343
4344 #endif /* DEBUG */