[project @ 2005-11-08 15:07:08 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 #ifdef THREADED_RTS
76 #define USED_WHEN_THREADED_RTS
77 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
78 #else
79 #define USED_WHEN_THREADED_RTS     STG_UNUSED
80 #define USED_WHEN_NON_THREADED_RTS
81 #endif
82
83 #ifdef SMP
84 #define USED_WHEN_SMP
85 #else
86 #define USED_WHEN_SMP STG_UNUSED
87 #endif
88
89 /* -----------------------------------------------------------------------------
90  * Global variables
91  * -------------------------------------------------------------------------- */
92
93 #if defined(GRAN)
94
95 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
96 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
97
98 /* 
99    In GranSim we have a runnable and a blocked queue for each processor.
100    In order to minimise code changes new arrays run_queue_hds/tls
101    are created. run_queue_hd is then a short cut (macro) for
102    run_queue_hds[CurrentProc] (see GranSim.h).
103    -- HWL
104 */
105 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
106 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
107 StgTSO *ccalling_threadss[MAX_PROC];
108 /* We use the same global list of threads (all_threads) in GranSim as in
109    the std RTS (i.e. we are cheating). However, we don't use this list in
110    the GranSim specific code at the moment (so we are only potentially
111    cheating).  */
112
113 #else /* !GRAN */
114
115 #if !defined(THREADED_RTS)
116 // Blocked/sleeping thrads
117 StgTSO *blocked_queue_hd = NULL;
118 StgTSO *blocked_queue_tl = NULL;
119 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
120 #endif
121
122 /* Threads blocked on blackholes.
123  * LOCK: sched_mutex+capability, or all capabilities
124  */
125 StgTSO *blackhole_queue = NULL;
126 #endif
127
128 /* The blackhole_queue should be checked for threads to wake up.  See
129  * Schedule.h for more thorough comment.
130  * LOCK: none (doesn't matter if we miss an update)
131  */
132 rtsBool blackholes_need_checking = rtsFalse;
133
134 /* Linked list of all threads.
135  * Used for detecting garbage collected threads.
136  * LOCK: sched_mutex+capability, or all capabilities
137  */
138 StgTSO *all_threads = NULL;
139
140 /* flag set by signal handler to precipitate a context switch
141  * LOCK: none (just an advisory flag)
142  */
143 int context_switch = 0;
144
145 /* flag that tracks whether we have done any execution in this time slice.
146  * LOCK: currently none, perhaps we should lock (but needs to be
147  * updated in the fast path of the scheduler).
148  */
149 nat recent_activity = ACTIVITY_YES;
150
151 /* if this flag is set as well, give up execution
152  * LOCK: none (changes once, from false->true)
153  */
154 rtsBool interrupted = rtsFalse;
155
156 /* Next thread ID to allocate.
157  * LOCK: sched_mutex
158  */
159 static StgThreadID next_thread_id = 1;
160
161 /* The smallest stack size that makes any sense is:
162  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
163  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
164  *  + 1                       (the closure to enter)
165  *  + 1                       (stg_ap_v_ret)
166  *  + 1                       (spare slot req'd by stg_ap_v_ret)
167  *
168  * A thread with this stack will bomb immediately with a stack
169  * overflow, which will increase its stack size.  
170  */
171 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
172
173 #if defined(GRAN)
174 StgTSO *CurrentTSO;
175 #endif
176
177 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
178  *  exists - earlier gccs apparently didn't.
179  *  -= chak
180  */
181 StgTSO dummy_tso;
182
183 /*
184  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
185  * in an MT setting, needed to signal that a worker thread shouldn't hang around
186  * in the scheduler when it is out of work.
187  */
188 rtsBool shutting_down_scheduler = rtsFalse;
189
190 /*
191  * This mutex protects most of the global scheduler data in
192  * the THREADED_RTS and (inc. SMP) runtime.
193  */
194 #if defined(THREADED_RTS)
195 Mutex sched_mutex;
196 #endif
197
198 #if defined(PARALLEL_HASKELL)
199 StgTSO *LastTSO;
200 rtsTime TimeOfLastYield;
201 rtsBool emitSchedule = rtsTrue;
202 #endif
203
204 /* -----------------------------------------------------------------------------
205  * static function prototypes
206  * -------------------------------------------------------------------------- */
207
208 static Capability *schedule (Capability *initialCapability, Task *task);
209
210 //
211 // These function all encapsulate parts of the scheduler loop, and are
212 // abstracted only to make the structure and control flow of the
213 // scheduler clearer.
214 //
215 static void schedulePreLoop (void);
216 #if defined(SMP)
217 static void schedulePushWork(Capability *cap, Task *task);
218 #endif
219 static void scheduleStartSignalHandlers (Capability *cap);
220 static void scheduleCheckBlockedThreads (Capability *cap);
221 static void scheduleCheckBlackHoles (Capability *cap);
222 static void scheduleDetectDeadlock (Capability *cap, Task *task);
223 #if defined(GRAN)
224 static StgTSO *scheduleProcessEvent(rtsEvent *event);
225 #endif
226 #if defined(PARALLEL_HASKELL)
227 static StgTSO *scheduleSendPendingMessages(void);
228 static void scheduleActivateSpark(void);
229 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
230 #endif
231 #if defined(PAR) || defined(GRAN)
232 static void scheduleGranParReport(void);
233 #endif
234 static void schedulePostRunThread(void);
235 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
236 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
237                                          StgTSO *t);
238 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
239                                     nat prev_what_next );
240 static void scheduleHandleThreadBlocked( StgTSO *t );
241 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
242                                              StgTSO *t );
243 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
244 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
245
246 static void unblockThread(Capability *cap, StgTSO *tso);
247 static rtsBool checkBlackHoles(Capability *cap);
248 static void AllRoots(evac_fn evac);
249
250 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
251
252 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
253                         rtsBool stop_at_atomically);
254
255 static void deleteThread (Capability *cap, StgTSO *tso);
256 static void deleteRunQueue (Capability *cap);
257
258 #ifdef DEBUG
259 static void printThreadBlockage(StgTSO *tso);
260 static void printThreadStatus(StgTSO *tso);
261 void printThreadQueue(StgTSO *tso);
262 #endif
263
264 #if defined(PARALLEL_HASKELL)
265 StgTSO * createSparkThread(rtsSpark spark);
266 StgTSO * activateSpark (rtsSpark spark);  
267 #endif
268
269 #ifdef DEBUG
270 static char *whatNext_strs[] = {
271   "(unknown)",
272   "ThreadRunGHC",
273   "ThreadInterpret",
274   "ThreadKilled",
275   "ThreadRelocated",
276   "ThreadComplete"
277 };
278 #endif
279
280 /* -----------------------------------------------------------------------------
281  * Putting a thread on the run queue: different scheduling policies
282  * -------------------------------------------------------------------------- */
283
284 STATIC_INLINE void
285 addToRunQueue( Capability *cap, StgTSO *t )
286 {
287 #if defined(PARALLEL_HASKELL)
288     if (RtsFlags.ParFlags.doFairScheduling) { 
289         // this does round-robin scheduling; good for concurrency
290         appendToRunQueue(cap,t);
291     } else {
292         // this does unfair scheduling; good for parallelism
293         pushOnRunQueue(cap,t);
294     }
295 #else
296     // this does round-robin scheduling; good for concurrency
297     appendToRunQueue(cap,t);
298 #endif
299 }
300
301 /* ---------------------------------------------------------------------------
302    Main scheduling loop.
303
304    We use round-robin scheduling, each thread returning to the
305    scheduler loop when one of these conditions is detected:
306
307       * out of heap space
308       * timer expires (thread yields)
309       * thread blocks
310       * thread ends
311       * stack overflow
312
313    GRAN version:
314      In a GranSim setup this loop iterates over the global event queue.
315      This revolves around the global event queue, which determines what 
316      to do next. Therefore, it's more complicated than either the 
317      concurrent or the parallel (GUM) setup.
318
319    GUM version:
320      GUM iterates over incoming messages.
321      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
322      and sends out a fish whenever it has nothing to do; in-between
323      doing the actual reductions (shared code below) it processes the
324      incoming messages and deals with delayed operations 
325      (see PendingFetches).
326      This is not the ugliest code you could imagine, but it's bloody close.
327
328    ------------------------------------------------------------------------ */
329
330 static Capability *
331 schedule (Capability *initialCapability, Task *task)
332 {
333   StgTSO *t;
334   Capability *cap;
335   StgThreadReturnCode ret;
336 #if defined(GRAN)
337   rtsEvent *event;
338 #elif defined(PARALLEL_HASKELL)
339   StgTSO *tso;
340   GlobalTaskId pe;
341   rtsBool receivedFinish = rtsFalse;
342 # if defined(DEBUG)
343   nat tp_size, sp_size; // stats only
344 # endif
345 #endif
346   nat prev_what_next;
347   rtsBool ready_to_gc;
348 #if defined(THREADED_RTS)
349   rtsBool first = rtsTrue;
350 #endif
351   
352   cap = initialCapability;
353
354   // Pre-condition: this task owns initialCapability.
355   // The sched_mutex is *NOT* held
356   // NB. on return, we still hold a capability.
357
358   IF_DEBUG(scheduler,
359            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
360                        task, initialCapability);
361       );
362
363   schedulePreLoop();
364
365   // -----------------------------------------------------------
366   // Scheduler loop starts here:
367
368 #if defined(PARALLEL_HASKELL)
369 #define TERMINATION_CONDITION        (!receivedFinish)
370 #elif defined(GRAN)
371 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
372 #else
373 #define TERMINATION_CONDITION        rtsTrue
374 #endif
375
376   while (TERMINATION_CONDITION) {
377
378 #if defined(GRAN)
379       /* Choose the processor with the next event */
380       CurrentProc = event->proc;
381       CurrentTSO = event->tso;
382 #endif
383
384 #if defined(THREADED_RTS)
385       if (first) {
386           // don't yield the first time, we want a chance to run this
387           // thread for a bit, even if there are others banging at the
388           // door.
389           first = rtsFalse;
390           ASSERT_CAPABILITY_INVARIANTS(cap,task);
391       } else {
392           // Yield the capability to higher-priority tasks if necessary.
393           yieldCapability(&cap, task);
394       }
395 #endif
396       
397 #ifdef SMP
398       schedulePushWork(cap,task);
399 #endif          
400
401     // Check whether we have re-entered the RTS from Haskell without
402     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
403     // call).
404     if (cap->in_haskell) {
405           errorBelch("schedule: re-entered unsafely.\n"
406                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
407           stg_exit(EXIT_FAILURE);
408     }
409
410     //
411     // Test for interruption.  If interrupted==rtsTrue, then either
412     // we received a keyboard interrupt (^C), or the scheduler is
413     // trying to shut down all the tasks (shutting_down_scheduler) in
414     // the threaded RTS.
415     //
416     if (interrupted) {
417         deleteRunQueue(cap);
418         if (shutting_down_scheduler) {
419             IF_DEBUG(scheduler, sched_belch("shutting down"));
420             // If we are a worker, just exit.  If we're a bound thread
421             // then we will exit below when we've removed our TSO from
422             // the run queue.
423             if (task->tso == NULL && emptyRunQueue(cap)) {
424                 return cap;
425             }
426         } else {
427             IF_DEBUG(scheduler, sched_belch("interrupted"));
428         }
429     }
430
431 #if defined(not_yet) && defined(SMP)
432     //
433     // Top up the run queue from our spark pool.  We try to make the
434     // number of threads in the run queue equal to the number of
435     // free capabilities.
436     //
437     {
438         StgClosure *spark;
439         if (emptyRunQueue()) {
440             spark = findSpark(rtsFalse);
441             if (spark == NULL) {
442                 break; /* no more sparks in the pool */
443             } else {
444                 createSparkThread(spark);         
445                 IF_DEBUG(scheduler,
446                          sched_belch("==^^ turning spark of closure %p into a thread",
447                                      (StgClosure *)spark));
448             }
449         }
450     }
451 #endif // SMP
452
453     scheduleStartSignalHandlers(cap);
454
455     // Only check the black holes here if we've nothing else to do.
456     // During normal execution, the black hole list only gets checked
457     // at GC time, to avoid repeatedly traversing this possibly long
458     // list each time around the scheduler.
459     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
460
461     scheduleCheckBlockedThreads(cap);
462
463     scheduleDetectDeadlock(cap,task);
464
465     // Normally, the only way we can get here with no threads to
466     // run is if a keyboard interrupt received during 
467     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
468     // Additionally, it is not fatal for the
469     // threaded RTS to reach here with no threads to run.
470     //
471     // win32: might be here due to awaitEvent() being abandoned
472     // as a result of a console event having been delivered.
473     if ( emptyRunQueue(cap) ) {
474 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
475         ASSERT(interrupted);
476 #endif
477         continue; // nothing to do
478     }
479
480 #if defined(PARALLEL_HASKELL)
481     scheduleSendPendingMessages();
482     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
483         continue;
484
485 #if defined(SPARKS)
486     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
487 #endif
488
489     /* If we still have no work we need to send a FISH to get a spark
490        from another PE */
491     if (emptyRunQueue(cap)) {
492         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
493         ASSERT(rtsFalse); // should not happen at the moment
494     }
495     // from here: non-empty run queue.
496     //  TODO: merge above case with this, only one call processMessages() !
497     if (PacketsWaiting()) {  /* process incoming messages, if
498                                 any pending...  only in else
499                                 because getRemoteWork waits for
500                                 messages as well */
501         receivedFinish = processMessages();
502     }
503 #endif
504
505 #if defined(GRAN)
506     scheduleProcessEvent(event);
507 #endif
508
509     // 
510     // Get a thread to run
511     //
512     t = popRunQueue(cap);
513
514 #if defined(GRAN) || defined(PAR)
515     scheduleGranParReport(); // some kind of debuging output
516 #else
517     // Sanity check the thread we're about to run.  This can be
518     // expensive if there is lots of thread switching going on...
519     IF_DEBUG(sanity,checkTSO(t));
520 #endif
521
522 #if defined(THREADED_RTS)
523     // Check whether we can run this thread in the current task.
524     // If not, we have to pass our capability to the right task.
525     {
526         Task *bound = t->bound;
527       
528         if (bound) {
529             if (bound == task) {
530                 IF_DEBUG(scheduler,
531                          sched_belch("### Running thread %d in bound thread",
532                                      t->id));
533                 // yes, the Haskell thread is bound to the current native thread
534             } else {
535                 IF_DEBUG(scheduler,
536                          sched_belch("### thread %d bound to another OS thread",
537                                      t->id));
538                 // no, bound to a different Haskell thread: pass to that thread
539                 pushOnRunQueue(cap,t);
540                 continue;
541             }
542         } else {
543             // The thread we want to run is unbound.
544             if (task->tso) { 
545                 IF_DEBUG(scheduler,
546                          sched_belch("### this OS thread cannot run thread %d", t->id));
547                 // no, the current native thread is bound to a different
548                 // Haskell thread, so pass it to any worker thread
549                 pushOnRunQueue(cap,t);
550                 continue; 
551             }
552         }
553     }
554 #endif
555
556     cap->r.rCurrentTSO = t;
557     
558     /* context switches are initiated by the timer signal, unless
559      * the user specified "context switch as often as possible", with
560      * +RTS -C0
561      */
562     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
563         && !emptyThreadQueues(cap)) {
564         context_switch = 1;
565     }
566          
567 run_thread:
568
569     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
570                               (long)t->id, whatNext_strs[t->what_next]));
571
572 #if defined(PROFILING)
573     startHeapProfTimer();
574 #endif
575
576     // ----------------------------------------------------------------------
577     // Run the current thread 
578
579     prev_what_next = t->what_next;
580
581     errno = t->saved_errno;
582     cap->in_haskell = rtsTrue;
583
584     recent_activity = ACTIVITY_YES;
585
586     switch (prev_what_next) {
587         
588     case ThreadKilled:
589     case ThreadComplete:
590         /* Thread already finished, return to scheduler. */
591         ret = ThreadFinished;
592         break;
593         
594     case ThreadRunGHC:
595     {
596         StgRegTable *r;
597         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
598         cap = regTableToCapability(r);
599         ret = r->rRet;
600         break;
601     }
602     
603     case ThreadInterpret:
604         cap = interpretBCO(cap);
605         ret = cap->r.rRet;
606         break;
607         
608     default:
609         barf("schedule: invalid what_next field");
610     }
611
612     cap->in_haskell = rtsFalse;
613
614     // The TSO might have moved, eg. if it re-entered the RTS and a GC
615     // happened.  So find the new location:
616     t = cap->r.rCurrentTSO;
617
618     // We have run some Haskell code: there might be blackhole-blocked
619     // threads to wake up now.
620     // Lock-free test here should be ok, we're just setting a flag.
621     if ( blackhole_queue != END_TSO_QUEUE ) {
622         blackholes_need_checking = rtsTrue;
623     }
624     
625     // And save the current errno in this thread.
626     // XXX: possibly bogus for SMP because this thread might already
627     // be running again, see code below.
628     t->saved_errno = errno;
629
630 #ifdef SMP
631     // If ret is ThreadBlocked, and this Task is bound to the TSO that
632     // blocked, we are in limbo - the TSO is now owned by whatever it
633     // is blocked on, and may in fact already have been woken up,
634     // perhaps even on a different Capability.  It may be the case
635     // that task->cap != cap.  We better yield this Capability
636     // immediately and return to normaility.
637     if (ret == ThreadBlocked) {
638         IF_DEBUG(scheduler,
639                  debugBelch("--<< thread %d (%s) stopped: blocked\n",
640                             t->id, whatNext_strs[t->what_next]));
641         continue;
642     }
643 #endif
644
645     ASSERT_CAPABILITY_INVARIANTS(cap,task);
646
647     // ----------------------------------------------------------------------
648     
649     // Costs for the scheduler are assigned to CCS_SYSTEM
650 #if defined(PROFILING)
651     stopHeapProfTimer();
652     CCCS = CCS_SYSTEM;
653 #endif
654     
655 #if defined(THREADED_RTS)
656     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
657 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
658     IF_DEBUG(scheduler,debugBelch("sched: "););
659 #endif
660     
661     schedulePostRunThread();
662
663     ready_to_gc = rtsFalse;
664
665     switch (ret) {
666     case HeapOverflow:
667         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
668         break;
669
670     case StackOverflow:
671         scheduleHandleStackOverflow(cap,task,t);
672         break;
673
674     case ThreadYielding:
675         if (scheduleHandleYield(cap, t, prev_what_next)) {
676             // shortcut for switching between compiler/interpreter:
677             goto run_thread; 
678         }
679         break;
680
681     case ThreadBlocked:
682         scheduleHandleThreadBlocked(t);
683         break;
684
685     case ThreadFinished:
686         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
687         ASSERT_CAPABILITY_INVARIANTS(cap,task);
688         break;
689
690     default:
691       barf("schedule: invalid thread return code %d", (int)ret);
692     }
693
694     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
695     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
696   } /* end of while() */
697
698   IF_PAR_DEBUG(verbose,
699                debugBelch("== Leaving schedule() after having received Finish\n"));
700 }
701
702 /* ----------------------------------------------------------------------------
703  * Setting up the scheduler loop
704  * ------------------------------------------------------------------------- */
705
706 static void
707 schedulePreLoop(void)
708 {
709 #if defined(GRAN) 
710     /* set up first event to get things going */
711     /* ToDo: assign costs for system setup and init MainTSO ! */
712     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
713               ContinueThread, 
714               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
715     
716     IF_DEBUG(gran,
717              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
718                         CurrentTSO);
719              G_TSO(CurrentTSO, 5));
720     
721     if (RtsFlags.GranFlags.Light) {
722         /* Save current time; GranSim Light only */
723         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
724     }      
725 #endif
726 }
727
728 /* -----------------------------------------------------------------------------
729  * schedulePushWork()
730  *
731  * Push work to other Capabilities if we have some.
732  * -------------------------------------------------------------------------- */
733
734 #ifdef SMP
735 static void
736 schedulePushWork(Capability *cap USED_WHEN_SMP, 
737                  Task *task      USED_WHEN_SMP)
738 {
739     Capability *free_caps[n_capabilities], *cap0;
740     nat i, n_free_caps;
741
742     // Check whether we have more threads on our run queue that we
743     // could hand to another Capability.
744     if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
745         return;
746     }
747
748     // First grab as many free Capabilities as we can.
749     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
750         cap0 = &capabilities[i];
751         if (cap != cap0 && tryGrabCapability(cap0,task)) {
752             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
753                 // it already has some work, we just grabbed it at 
754                 // the wrong moment.  Or maybe it's deadlocked!
755                 releaseCapability(cap0);
756             } else {
757                 free_caps[n_free_caps++] = cap0;
758             }
759         }
760     }
761
762     // we now have n_free_caps free capabilities stashed in
763     // free_caps[].  Share our run queue equally with them.  This is
764     // probably the simplest thing we could do; improvements we might
765     // want to do include:
766     //
767     //   - giving high priority to moving relatively new threads, on 
768     //     the gournds that they haven't had time to build up a
769     //     working set in the cache on this CPU/Capability.
770     //
771     //   - giving low priority to moving long-lived threads
772
773     if (n_free_caps > 0) {
774         StgTSO *prev, *t, *next;
775         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
776
777         prev = cap->run_queue_hd;
778         t = prev->link;
779         prev->link = END_TSO_QUEUE;
780         i = 0;
781         for (; t != END_TSO_QUEUE; t = next) {
782             next = t->link;
783             t->link = END_TSO_QUEUE;
784             if (t->what_next == ThreadRelocated
785                 || t->bound == task) { // don't move my bound thread
786                 prev->link = t;
787                 prev = t;
788             } else if (i == n_free_caps) {
789                 i = 0;
790                 // keep one for us
791                 prev->link = t;
792                 prev = t;
793             } else {
794                 appendToRunQueue(free_caps[i],t);
795                 if (t->bound) { t->bound->cap = free_caps[i]; }
796                 i++;
797             }
798         }
799         cap->run_queue_tl = prev;
800
801         // release the capabilities
802         for (i = 0; i < n_free_caps; i++) {
803             task->cap = free_caps[i];
804             releaseCapability(free_caps[i]);
805         }
806     }
807     task->cap = cap; // reset to point to our Capability.
808 }
809 #endif
810
811 /* ----------------------------------------------------------------------------
812  * Start any pending signal handlers
813  * ------------------------------------------------------------------------- */
814
815 static void
816 scheduleStartSignalHandlers(Capability *cap)
817 {
818 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
819     if (signals_pending()) { // safe outside the lock
820         startSignalHandlers(cap);
821     }
822 #endif
823 }
824
825 /* ----------------------------------------------------------------------------
826  * Check for blocked threads that can be woken up.
827  * ------------------------------------------------------------------------- */
828
829 static void
830 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
831 {
832 #if !defined(THREADED_RTS)
833     //
834     // Check whether any waiting threads need to be woken up.  If the
835     // run queue is empty, and there are no other tasks running, we
836     // can wait indefinitely for something to happen.
837     //
838     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
839     {
840         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
841     }
842 #endif
843 }
844
845
846 /* ----------------------------------------------------------------------------
847  * Check for threads blocked on BLACKHOLEs that can be woken up
848  * ------------------------------------------------------------------------- */
849 static void
850 scheduleCheckBlackHoles (Capability *cap)
851 {
852     if ( blackholes_need_checking ) // check without the lock first
853     {
854         ACQUIRE_LOCK(&sched_mutex);
855         if ( blackholes_need_checking ) {
856             checkBlackHoles(cap);
857             blackholes_need_checking = rtsFalse;
858         }
859         RELEASE_LOCK(&sched_mutex);
860     }
861 }
862
863 /* ----------------------------------------------------------------------------
864  * Detect deadlock conditions and attempt to resolve them.
865  * ------------------------------------------------------------------------- */
866
867 static void
868 scheduleDetectDeadlock (Capability *cap, Task *task)
869 {
870
871 #if defined(PARALLEL_HASKELL)
872     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
873     return;
874 #endif
875
876     /* 
877      * Detect deadlock: when we have no threads to run, there are no
878      * threads blocked, waiting for I/O, or sleeping, and all the
879      * other tasks are waiting for work, we must have a deadlock of
880      * some description.
881      */
882     if ( emptyThreadQueues(cap) )
883     {
884 #if defined(THREADED_RTS)
885         /* 
886          * In the threaded RTS, we only check for deadlock if there
887          * has been no activity in a complete timeslice.  This means
888          * we won't eagerly start a full GC just because we don't have
889          * any threads to run currently.
890          */
891         if (recent_activity != ACTIVITY_INACTIVE) return;
892 #endif
893
894         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
895
896         // Garbage collection can release some new threads due to
897         // either (a) finalizers or (b) threads resurrected because
898         // they are unreachable and will therefore be sent an
899         // exception.  Any threads thus released will be immediately
900         // runnable.
901         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
902         recent_activity = ACTIVITY_DONE_GC;
903         
904         if ( !emptyRunQueue(cap) ) return;
905
906 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
907         /* If we have user-installed signal handlers, then wait
908          * for signals to arrive rather then bombing out with a
909          * deadlock.
910          */
911         if ( anyUserHandlers() ) {
912             IF_DEBUG(scheduler, 
913                      sched_belch("still deadlocked, waiting for signals..."));
914
915             awaitUserSignals();
916
917             if (signals_pending()) {
918                 startSignalHandlers(cap);
919             }
920
921             // either we have threads to run, or we were interrupted:
922             ASSERT(!emptyRunQueue(cap) || interrupted);
923         }
924 #endif
925
926 #if !defined(THREADED_RTS)
927         /* Probably a real deadlock.  Send the current main thread the
928          * Deadlock exception.
929          */
930         if (task->tso) {
931             switch (task->tso->why_blocked) {
932             case BlockedOnSTM:
933             case BlockedOnBlackHole:
934             case BlockedOnException:
935             case BlockedOnMVar:
936                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
937                 return;
938             default:
939                 barf("deadlock: main thread blocked in a strange way");
940             }
941         }
942         return;
943 #endif
944     }
945 }
946
947 /* ----------------------------------------------------------------------------
948  * Process an event (GRAN only)
949  * ------------------------------------------------------------------------- */
950
951 #if defined(GRAN)
952 static StgTSO *
953 scheduleProcessEvent(rtsEvent *event)
954 {
955     StgTSO *t;
956
957     if (RtsFlags.GranFlags.Light)
958       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
959
960     /* adjust time based on time-stamp */
961     if (event->time > CurrentTime[CurrentProc] &&
962         event->evttype != ContinueThread)
963       CurrentTime[CurrentProc] = event->time;
964     
965     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
966     if (!RtsFlags.GranFlags.Light)
967       handleIdlePEs();
968
969     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
970
971     /* main event dispatcher in GranSim */
972     switch (event->evttype) {
973       /* Should just be continuing execution */
974     case ContinueThread:
975       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
976       /* ToDo: check assertion
977       ASSERT(run_queue_hd != (StgTSO*)NULL &&
978              run_queue_hd != END_TSO_QUEUE);
979       */
980       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
981       if (!RtsFlags.GranFlags.DoAsyncFetch &&
982           procStatus[CurrentProc]==Fetching) {
983         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
984               CurrentTSO->id, CurrentTSO, CurrentProc);
985         goto next_thread;
986       } 
987       /* Ignore ContinueThreads for completed threads */
988       if (CurrentTSO->what_next == ThreadComplete) {
989         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
990               CurrentTSO->id, CurrentTSO, CurrentProc);
991         goto next_thread;
992       } 
993       /* Ignore ContinueThreads for threads that are being migrated */
994       if (PROCS(CurrentTSO)==Nowhere) { 
995         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
996               CurrentTSO->id, CurrentTSO, CurrentProc);
997         goto next_thread;
998       }
999       /* The thread should be at the beginning of the run queue */
1000       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1001         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1002               CurrentTSO->id, CurrentTSO, CurrentProc);
1003         break; // run the thread anyway
1004       }
1005       /*
1006       new_event(proc, proc, CurrentTime[proc],
1007                 FindWork,
1008                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1009       goto next_thread; 
1010       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1011       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1012
1013     case FetchNode:
1014       do_the_fetchnode(event);
1015       goto next_thread;             /* handle next event in event queue  */
1016       
1017     case GlobalBlock:
1018       do_the_globalblock(event);
1019       goto next_thread;             /* handle next event in event queue  */
1020       
1021     case FetchReply:
1022       do_the_fetchreply(event);
1023       goto next_thread;             /* handle next event in event queue  */
1024       
1025     case UnblockThread:   /* Move from the blocked queue to the tail of */
1026       do_the_unblock(event);
1027       goto next_thread;             /* handle next event in event queue  */
1028       
1029     case ResumeThread:  /* Move from the blocked queue to the tail of */
1030       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1031       event->tso->gran.blocktime += 
1032         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1033       do_the_startthread(event);
1034       goto next_thread;             /* handle next event in event queue  */
1035       
1036     case StartThread:
1037       do_the_startthread(event);
1038       goto next_thread;             /* handle next event in event queue  */
1039       
1040     case MoveThread:
1041       do_the_movethread(event);
1042       goto next_thread;             /* handle next event in event queue  */
1043       
1044     case MoveSpark:
1045       do_the_movespark(event);
1046       goto next_thread;             /* handle next event in event queue  */
1047       
1048     case FindWork:
1049       do_the_findwork(event);
1050       goto next_thread;             /* handle next event in event queue  */
1051       
1052     default:
1053       barf("Illegal event type %u\n", event->evttype);
1054     }  /* switch */
1055     
1056     /* This point was scheduler_loop in the old RTS */
1057
1058     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1059
1060     TimeOfLastEvent = CurrentTime[CurrentProc];
1061     TimeOfNextEvent = get_time_of_next_event();
1062     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1063     // CurrentTSO = ThreadQueueHd;
1064
1065     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1066                          TimeOfNextEvent));
1067
1068     if (RtsFlags.GranFlags.Light) 
1069       GranSimLight_leave_system(event, &ActiveTSO); 
1070
1071     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1072
1073     IF_DEBUG(gran, 
1074              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1075
1076     /* in a GranSim setup the TSO stays on the run queue */
1077     t = CurrentTSO;
1078     /* Take a thread from the run queue. */
1079     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1080
1081     IF_DEBUG(gran, 
1082              debugBelch("GRAN: About to run current thread, which is\n");
1083              G_TSO(t,5));
1084
1085     context_switch = 0; // turned on via GranYield, checking events and time slice
1086
1087     IF_DEBUG(gran, 
1088              DumpGranEvent(GR_SCHEDULE, t));
1089
1090     procStatus[CurrentProc] = Busy;
1091 }
1092 #endif // GRAN
1093
1094 /* ----------------------------------------------------------------------------
1095  * Send pending messages (PARALLEL_HASKELL only)
1096  * ------------------------------------------------------------------------- */
1097
1098 #if defined(PARALLEL_HASKELL)
1099 static StgTSO *
1100 scheduleSendPendingMessages(void)
1101 {
1102     StgSparkPool *pool;
1103     rtsSpark spark;
1104     StgTSO *t;
1105
1106 # if defined(PAR) // global Mem.Mgmt., omit for now
1107     if (PendingFetches != END_BF_QUEUE) {
1108         processFetches();
1109     }
1110 # endif
1111     
1112     if (RtsFlags.ParFlags.BufferTime) {
1113         // if we use message buffering, we must send away all message
1114         // packets which have become too old...
1115         sendOldBuffers(); 
1116     }
1117 }
1118 #endif
1119
1120 /* ----------------------------------------------------------------------------
1121  * Activate spark threads (PARALLEL_HASKELL only)
1122  * ------------------------------------------------------------------------- */
1123
1124 #if defined(PARALLEL_HASKELL)
1125 static void
1126 scheduleActivateSpark(void)
1127 {
1128 #if defined(SPARKS)
1129   ASSERT(emptyRunQueue());
1130 /* We get here if the run queue is empty and want some work.
1131    We try to turn a spark into a thread, and add it to the run queue,
1132    from where it will be picked up in the next iteration of the scheduler
1133    loop.
1134 */
1135
1136       /* :-[  no local threads => look out for local sparks */
1137       /* the spark pool for the current PE */
1138       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1139       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1140           pool->hd < pool->tl) {
1141         /* 
1142          * ToDo: add GC code check that we really have enough heap afterwards!!
1143          * Old comment:
1144          * If we're here (no runnable threads) and we have pending
1145          * sparks, we must have a space problem.  Get enough space
1146          * to turn one of those pending sparks into a
1147          * thread... 
1148          */
1149
1150         spark = findSpark(rtsFalse);            /* get a spark */
1151         if (spark != (rtsSpark) NULL) {
1152           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1153           IF_PAR_DEBUG(fish, // schedule,
1154                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1155                              tso->id, tso, advisory_thread_count));
1156
1157           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1158             IF_PAR_DEBUG(fish, // schedule,
1159                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1160                             spark));
1161             return rtsFalse; /* failed to generate a thread */
1162           }                  /* otherwise fall through & pick-up new tso */
1163         } else {
1164           IF_PAR_DEBUG(fish, // schedule,
1165                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1166                              spark_queue_len(pool)));
1167           return rtsFalse;  /* failed to generate a thread */
1168         }
1169         return rtsTrue;  /* success in generating a thread */
1170   } else { /* no more threads permitted or pool empty */
1171     return rtsFalse;  /* failed to generateThread */
1172   }
1173 #else
1174   tso = NULL; // avoid compiler warning only
1175   return rtsFalse;  /* dummy in non-PAR setup */
1176 #endif // SPARKS
1177 }
1178 #endif // PARALLEL_HASKELL
1179
1180 /* ----------------------------------------------------------------------------
1181  * Get work from a remote node (PARALLEL_HASKELL only)
1182  * ------------------------------------------------------------------------- */
1183     
1184 #if defined(PARALLEL_HASKELL)
1185 static rtsBool
1186 scheduleGetRemoteWork(rtsBool *receivedFinish)
1187 {
1188   ASSERT(emptyRunQueue());
1189
1190   if (RtsFlags.ParFlags.BufferTime) {
1191         IF_PAR_DEBUG(verbose, 
1192                 debugBelch("...send all pending data,"));
1193         {
1194           nat i;
1195           for (i=1; i<=nPEs; i++)
1196             sendImmediately(i); // send all messages away immediately
1197         }
1198   }
1199 # ifndef SPARKS
1200         //++EDEN++ idle() , i.e. send all buffers, wait for work
1201         // suppress fishing in EDEN... just look for incoming messages
1202         // (blocking receive)
1203   IF_PAR_DEBUG(verbose, 
1204                debugBelch("...wait for incoming messages...\n"));
1205   *receivedFinish = processMessages(); // blocking receive...
1206
1207         // and reenter scheduling loop after having received something
1208         // (return rtsFalse below)
1209
1210 # else /* activate SPARKS machinery */
1211 /* We get here, if we have no work, tried to activate a local spark, but still
1212    have no work. We try to get a remote spark, by sending a FISH message.
1213    Thread migration should be added here, and triggered when a sequence of 
1214    fishes returns without work. */
1215         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1216
1217       /* =8-[  no local sparks => look for work on other PEs */
1218         /*
1219          * We really have absolutely no work.  Send out a fish
1220          * (there may be some out there already), and wait for
1221          * something to arrive.  We clearly can't run any threads
1222          * until a SCHEDULE or RESUME arrives, and so that's what
1223          * we're hoping to see.  (Of course, we still have to
1224          * respond to other types of messages.)
1225          */
1226         rtsTime now = msTime() /*CURRENT_TIME*/;
1227         IF_PAR_DEBUG(verbose, 
1228                      debugBelch("--  now=%ld\n", now));
1229         IF_PAR_DEBUG(fish, // verbose,
1230              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1231                  (last_fish_arrived_at!=0 &&
1232                   last_fish_arrived_at+delay > now)) {
1233                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1234                      now, last_fish_arrived_at+delay, 
1235                      last_fish_arrived_at,
1236                      delay);
1237              });
1238   
1239         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1240             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1241           if (last_fish_arrived_at==0 ||
1242               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1243             /* outstandingFishes is set in sendFish, processFish;
1244                avoid flooding system with fishes via delay */
1245     next_fish_to_send_at = 0;  
1246   } else {
1247     /* ToDo: this should be done in the main scheduling loop to avoid the
1248              busy wait here; not so bad if fish delay is very small  */
1249     int iq = 0; // DEBUGGING -- HWL
1250     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1251     /* send a fish when ready, but process messages that arrive in the meantime */
1252     do {
1253       if (PacketsWaiting()) {
1254         iq++; // DEBUGGING
1255         *receivedFinish = processMessages();
1256       }
1257       now = msTime();
1258     } while (!*receivedFinish || now<next_fish_to_send_at);
1259     // JB: This means the fish could become obsolete, if we receive
1260     // work. Better check for work again? 
1261     // last line: while (!receivedFinish || !haveWork || now<...)
1262     // next line: if (receivedFinish || haveWork )
1263
1264     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1265       return rtsFalse;  // NB: this will leave scheduler loop
1266                         // immediately after return!
1267                           
1268     IF_PAR_DEBUG(fish, // verbose,
1269                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1270
1271   }
1272
1273     // JB: IMHO, this should all be hidden inside sendFish(...)
1274     /* pe = choosePE(); 
1275        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1276                 NEW_FISH_HUNGER);
1277
1278     // Global statistics: count no. of fishes
1279     if (RtsFlags.ParFlags.ParStats.Global &&
1280          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1281            globalParStats.tot_fish_mess++;
1282            }
1283     */ 
1284
1285   /* delayed fishes must have been sent by now! */
1286   next_fish_to_send_at = 0;  
1287   }
1288       
1289   *receivedFinish = processMessages();
1290 # endif /* SPARKS */
1291
1292  return rtsFalse;
1293  /* NB: this function always returns rtsFalse, meaning the scheduler
1294     loop continues with the next iteration; 
1295     rationale: 
1296       return code means success in finding work; we enter this function
1297       if there is no local work, thus have to send a fish which takes
1298       time until it arrives with work; in the meantime we should process
1299       messages in the main loop;
1300  */
1301 }
1302 #endif // PARALLEL_HASKELL
1303
1304 /* ----------------------------------------------------------------------------
1305  * PAR/GRAN: Report stats & debugging info(?)
1306  * ------------------------------------------------------------------------- */
1307
1308 #if defined(PAR) || defined(GRAN)
1309 static void
1310 scheduleGranParReport(void)
1311 {
1312   ASSERT(run_queue_hd != END_TSO_QUEUE);
1313
1314   /* Take a thread from the run queue, if we have work */
1315   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1316
1317     /* If this TSO has got its outport closed in the meantime, 
1318      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1319      * It has to be marked as TH_DEAD for this purpose.
1320      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1321
1322 JB: TODO: investigate wether state change field could be nuked
1323      entirely and replaced by the normal tso state (whatnext
1324      field). All we want to do is to kill tsos from outside.
1325      */
1326
1327     /* ToDo: write something to the log-file
1328     if (RTSflags.ParFlags.granSimStats && !sameThread)
1329         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1330
1331     CurrentTSO = t;
1332     */
1333     /* the spark pool for the current PE */
1334     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1335
1336     IF_DEBUG(scheduler, 
1337              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1338                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1339
1340     IF_PAR_DEBUG(fish,
1341              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1342                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1343
1344     if (RtsFlags.ParFlags.ParStats.Full && 
1345         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1346         (emitSchedule || // forced emit
1347          (t && LastTSO && t->id != LastTSO->id))) {
1348       /* 
1349          we are running a different TSO, so write a schedule event to log file
1350          NB: If we use fair scheduling we also have to write  a deschedule 
1351              event for LastTSO; with unfair scheduling we know that the
1352              previous tso has blocked whenever we switch to another tso, so
1353              we don't need it in GUM for now
1354       */
1355       IF_PAR_DEBUG(fish, // schedule,
1356                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1357
1358       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1359                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1360       emitSchedule = rtsFalse;
1361     }
1362 }     
1363 #endif
1364
1365 /* ----------------------------------------------------------------------------
1366  * After running a thread...
1367  * ------------------------------------------------------------------------- */
1368
1369 static void
1370 schedulePostRunThread(void)
1371 {
1372 #if defined(PAR)
1373     /* HACK 675: if the last thread didn't yield, make sure to print a 
1374        SCHEDULE event to the log file when StgRunning the next thread, even
1375        if it is the same one as before */
1376     LastTSO = t; 
1377     TimeOfLastYield = CURRENT_TIME;
1378 #endif
1379
1380   /* some statistics gathering in the parallel case */
1381
1382 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1383   switch (ret) {
1384     case HeapOverflow:
1385 # if defined(GRAN)
1386       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1387       globalGranStats.tot_heapover++;
1388 # elif defined(PAR)
1389       globalParStats.tot_heapover++;
1390 # endif
1391       break;
1392
1393      case StackOverflow:
1394 # if defined(GRAN)
1395       IF_DEBUG(gran, 
1396                DumpGranEvent(GR_DESCHEDULE, t));
1397       globalGranStats.tot_stackover++;
1398 # elif defined(PAR)
1399       // IF_DEBUG(par, 
1400       // DumpGranEvent(GR_DESCHEDULE, t);
1401       globalParStats.tot_stackover++;
1402 # endif
1403       break;
1404
1405     case ThreadYielding:
1406 # if defined(GRAN)
1407       IF_DEBUG(gran, 
1408                DumpGranEvent(GR_DESCHEDULE, t));
1409       globalGranStats.tot_yields++;
1410 # elif defined(PAR)
1411       // IF_DEBUG(par, 
1412       // DumpGranEvent(GR_DESCHEDULE, t);
1413       globalParStats.tot_yields++;
1414 # endif
1415       break; 
1416
1417     case ThreadBlocked:
1418 # if defined(GRAN)
1419       IF_DEBUG(scheduler,
1420                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1421                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1422                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1423                if (t->block_info.closure!=(StgClosure*)NULL)
1424                  print_bq(t->block_info.closure);
1425                debugBelch("\n"));
1426
1427       // ??? needed; should emit block before
1428       IF_DEBUG(gran, 
1429                DumpGranEvent(GR_DESCHEDULE, t)); 
1430       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1431       /*
1432         ngoq Dogh!
1433       ASSERT(procStatus[CurrentProc]==Busy || 
1434               ((procStatus[CurrentProc]==Fetching) && 
1435               (t->block_info.closure!=(StgClosure*)NULL)));
1436       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1437           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1438             procStatus[CurrentProc]==Fetching)) 
1439         procStatus[CurrentProc] = Idle;
1440       */
1441 # elif defined(PAR)
1442 //++PAR++  blockThread() writes the event (change?)
1443 # endif
1444     break;
1445
1446   case ThreadFinished:
1447     break;
1448
1449   default:
1450     barf("parGlobalStats: unknown return code");
1451     break;
1452     }
1453 #endif
1454 }
1455
1456 /* -----------------------------------------------------------------------------
1457  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1458  * -------------------------------------------------------------------------- */
1459
1460 static rtsBool
1461 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1462 {
1463     // did the task ask for a large block?
1464     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1465         // if so, get one and push it on the front of the nursery.
1466         bdescr *bd;
1467         lnat blocks;
1468         
1469         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1470         
1471         IF_DEBUG(scheduler,
1472                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1473                             (long)t->id, whatNext_strs[t->what_next], blocks));
1474         
1475         // don't do this if the nursery is (nearly) full, we'll GC first.
1476         if (cap->r.rCurrentNursery->link != NULL ||
1477             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1478                                                // if the nursery has only one block.
1479             
1480             ACQUIRE_SM_LOCK
1481             bd = allocGroup( blocks );
1482             RELEASE_SM_LOCK
1483             cap->r.rNursery->n_blocks += blocks;
1484             
1485             // link the new group into the list
1486             bd->link = cap->r.rCurrentNursery;
1487             bd->u.back = cap->r.rCurrentNursery->u.back;
1488             if (cap->r.rCurrentNursery->u.back != NULL) {
1489                 cap->r.rCurrentNursery->u.back->link = bd;
1490             } else {
1491 #if !defined(SMP)
1492                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1493                        g0s0 == cap->r.rNursery);
1494 #endif
1495                 cap->r.rNursery->blocks = bd;
1496             }             
1497             cap->r.rCurrentNursery->u.back = bd;
1498             
1499             // initialise it as a nursery block.  We initialise the
1500             // step, gen_no, and flags field of *every* sub-block in
1501             // this large block, because this is easier than making
1502             // sure that we always find the block head of a large
1503             // block whenever we call Bdescr() (eg. evacuate() and
1504             // isAlive() in the GC would both have to do this, at
1505             // least).
1506             { 
1507                 bdescr *x;
1508                 for (x = bd; x < bd + blocks; x++) {
1509                     x->step = cap->r.rNursery;
1510                     x->gen_no = 0;
1511                     x->flags = 0;
1512                 }
1513             }
1514             
1515             // This assert can be a killer if the app is doing lots
1516             // of large block allocations.
1517             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1518             
1519             // now update the nursery to point to the new block
1520             cap->r.rCurrentNursery = bd;
1521             
1522             // we might be unlucky and have another thread get on the
1523             // run queue before us and steal the large block, but in that
1524             // case the thread will just end up requesting another large
1525             // block.
1526             pushOnRunQueue(cap,t);
1527             return rtsFalse;  /* not actually GC'ing */
1528         }
1529     }
1530     
1531     IF_DEBUG(scheduler,
1532              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1533                         (long)t->id, whatNext_strs[t->what_next]));
1534 #if defined(GRAN)
1535     ASSERT(!is_on_queue(t,CurrentProc));
1536 #elif defined(PARALLEL_HASKELL)
1537     /* Currently we emit a DESCHEDULE event before GC in GUM.
1538        ToDo: either add separate event to distinguish SYSTEM time from rest
1539        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1540     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1541         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1542                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1543         emitSchedule = rtsTrue;
1544     }
1545 #endif
1546       
1547     pushOnRunQueue(cap,t);
1548     return rtsTrue;
1549     /* actual GC is done at the end of the while loop in schedule() */
1550 }
1551
1552 /* -----------------------------------------------------------------------------
1553  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1554  * -------------------------------------------------------------------------- */
1555
1556 static void
1557 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1558 {
1559     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1560                                   (long)t->id, whatNext_strs[t->what_next]));
1561     /* just adjust the stack for this thread, then pop it back
1562      * on the run queue.
1563      */
1564     { 
1565         /* enlarge the stack */
1566         StgTSO *new_t = threadStackOverflow(cap, t);
1567         
1568         /* The TSO attached to this Task may have moved, so update the
1569          * pointer to it.
1570          */
1571         if (task->tso == t) {
1572             task->tso = new_t;
1573         }
1574         pushOnRunQueue(cap,new_t);
1575     }
1576 }
1577
1578 /* -----------------------------------------------------------------------------
1579  * Handle a thread that returned to the scheduler with ThreadYielding
1580  * -------------------------------------------------------------------------- */
1581
1582 static rtsBool
1583 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1584 {
1585     // Reset the context switch flag.  We don't do this just before
1586     // running the thread, because that would mean we would lose ticks
1587     // during GC, which can lead to unfair scheduling (a thread hogs
1588     // the CPU because the tick always arrives during GC).  This way
1589     // penalises threads that do a lot of allocation, but that seems
1590     // better than the alternative.
1591     context_switch = 0;
1592     
1593     /* put the thread back on the run queue.  Then, if we're ready to
1594      * GC, check whether this is the last task to stop.  If so, wake
1595      * up the GC thread.  getThread will block during a GC until the
1596      * GC is finished.
1597      */
1598     IF_DEBUG(scheduler,
1599              if (t->what_next != prev_what_next) {
1600                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1601                             (long)t->id, whatNext_strs[t->what_next]);
1602              } else {
1603                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1604                             (long)t->id, whatNext_strs[t->what_next]);
1605              }
1606         );
1607     
1608     IF_DEBUG(sanity,
1609              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1610              checkTSO(t));
1611     ASSERT(t->link == END_TSO_QUEUE);
1612     
1613     // Shortcut if we're just switching evaluators: don't bother
1614     // doing stack squeezing (which can be expensive), just run the
1615     // thread.
1616     if (t->what_next != prev_what_next) {
1617         return rtsTrue;
1618     }
1619     
1620 #if defined(GRAN)
1621     ASSERT(!is_on_queue(t,CurrentProc));
1622       
1623     IF_DEBUG(sanity,
1624              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1625              checkThreadQsSanity(rtsTrue));
1626
1627 #endif
1628
1629     addToRunQueue(cap,t);
1630
1631 #if defined(GRAN)
1632     /* add a ContinueThread event to actually process the thread */
1633     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1634               ContinueThread,
1635               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1636     IF_GRAN_DEBUG(bq, 
1637                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1638                   G_EVENTQ(0);
1639                   G_CURR_THREADQ(0));
1640 #endif
1641     return rtsFalse;
1642 }
1643
1644 /* -----------------------------------------------------------------------------
1645  * Handle a thread that returned to the scheduler with ThreadBlocked
1646  * -------------------------------------------------------------------------- */
1647
1648 static void
1649 scheduleHandleThreadBlocked( StgTSO *t
1650 #if !defined(GRAN) && !defined(DEBUG)
1651     STG_UNUSED
1652 #endif
1653     )
1654 {
1655 #if defined(GRAN)
1656     IF_DEBUG(scheduler,
1657              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1658                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1659              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1660     
1661     // ??? needed; should emit block before
1662     IF_DEBUG(gran, 
1663              DumpGranEvent(GR_DESCHEDULE, t)); 
1664     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1665     /*
1666       ngoq Dogh!
1667       ASSERT(procStatus[CurrentProc]==Busy || 
1668       ((procStatus[CurrentProc]==Fetching) && 
1669       (t->block_info.closure!=(StgClosure*)NULL)));
1670       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1671       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1672       procStatus[CurrentProc]==Fetching)) 
1673       procStatus[CurrentProc] = Idle;
1674     */
1675 #elif defined(PAR)
1676     IF_DEBUG(scheduler,
1677              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1678                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1679     IF_PAR_DEBUG(bq,
1680                  
1681                  if (t->block_info.closure!=(StgClosure*)NULL) 
1682                  print_bq(t->block_info.closure));
1683     
1684     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1685     blockThread(t);
1686     
1687     /* whatever we schedule next, we must log that schedule */
1688     emitSchedule = rtsTrue;
1689     
1690 #else /* !GRAN */
1691
1692       // We don't need to do anything.  The thread is blocked, and it
1693       // has tidied up its stack and placed itself on whatever queue
1694       // it needs to be on.
1695
1696 #if !defined(SMP)
1697     ASSERT(t->why_blocked != NotBlocked);
1698              // This might not be true under SMP: we don't have
1699              // exclusive access to this TSO, so someone might have
1700              // woken it up by now.  This actually happens: try
1701              // conc023 +RTS -N2.
1702 #endif
1703
1704     IF_DEBUG(scheduler,
1705              debugBelch("--<< thread %d (%s) stopped: ", 
1706                         t->id, whatNext_strs[t->what_next]);
1707              printThreadBlockage(t);
1708              debugBelch("\n"));
1709     
1710     /* Only for dumping event to log file 
1711        ToDo: do I need this in GranSim, too?
1712        blockThread(t);
1713     */
1714 #endif
1715 }
1716
1717 /* -----------------------------------------------------------------------------
1718  * Handle a thread that returned to the scheduler with ThreadFinished
1719  * -------------------------------------------------------------------------- */
1720
1721 static rtsBool
1722 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1723 {
1724     /* Need to check whether this was a main thread, and if so,
1725      * return with the return value.
1726      *
1727      * We also end up here if the thread kills itself with an
1728      * uncaught exception, see Exception.cmm.
1729      */
1730     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1731                                   t->id, whatNext_strs[t->what_next]));
1732
1733 #if defined(GRAN)
1734       endThread(t, CurrentProc); // clean-up the thread
1735 #elif defined(PARALLEL_HASKELL)
1736       /* For now all are advisory -- HWL */
1737       //if(t->priority==AdvisoryPriority) ??
1738       advisory_thread_count--; // JB: Caution with this counter, buggy!
1739       
1740 # if defined(DIST)
1741       if(t->dist.priority==RevalPriority)
1742         FinishReval(t);
1743 # endif
1744     
1745 # if defined(EDENOLD)
1746       // the thread could still have an outport... (BUG)
1747       if (t->eden.outport != -1) {
1748       // delete the outport for the tso which has finished...
1749         IF_PAR_DEBUG(eden_ports,
1750                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1751                               t->eden.outport, t->id));
1752         deleteOPT(t);
1753       }
1754       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1755       if (t->eden.epid != -1) {
1756         IF_PAR_DEBUG(eden_ports,
1757                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1758                            t->id, t->eden.epid));
1759         removeTSOfromProcess(t);
1760       }
1761 # endif 
1762
1763 # if defined(PAR)
1764       if (RtsFlags.ParFlags.ParStats.Full &&
1765           !RtsFlags.ParFlags.ParStats.Suppressed) 
1766         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1767
1768       //  t->par only contains statistics: left out for now...
1769       IF_PAR_DEBUG(fish,
1770                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1771                               t->id,t,t->par.sparkname));
1772 # endif
1773 #endif // PARALLEL_HASKELL
1774
1775       //
1776       // Check whether the thread that just completed was a bound
1777       // thread, and if so return with the result.  
1778       //
1779       // There is an assumption here that all thread completion goes
1780       // through this point; we need to make sure that if a thread
1781       // ends up in the ThreadKilled state, that it stays on the run
1782       // queue so it can be dealt with here.
1783       //
1784
1785       if (t->bound) {
1786
1787           if (t->bound != task) {
1788 #if !defined(THREADED_RTS)
1789               // Must be a bound thread that is not the topmost one.  Leave
1790               // it on the run queue until the stack has unwound to the
1791               // point where we can deal with this.  Leaving it on the run
1792               // queue also ensures that the garbage collector knows about
1793               // this thread and its return value (it gets dropped from the
1794               // all_threads list so there's no other way to find it).
1795               appendToRunQueue(cap,t);
1796               return rtsFalse;
1797 #else
1798               // this cannot happen in the threaded RTS, because a
1799               // bound thread can only be run by the appropriate Task.
1800               barf("finished bound thread that isn't mine");
1801 #endif
1802           }
1803
1804           ASSERT(task->tso == t);
1805
1806           if (t->what_next == ThreadComplete) {
1807               if (task->ret) {
1808                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1809                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1810               }
1811               task->stat = Success;
1812           } else {
1813               if (task->ret) {
1814                   *(task->ret) = NULL;
1815               }
1816               if (interrupted) {
1817                   task->stat = Interrupted;
1818               } else {
1819                   task->stat = Killed;
1820               }
1821           }
1822 #ifdef DEBUG
1823           removeThreadLabel((StgWord)task->tso->id);
1824 #endif
1825           return rtsTrue; // tells schedule() to return
1826       }
1827
1828       return rtsFalse;
1829 }
1830
1831 /* -----------------------------------------------------------------------------
1832  * Perform a heap census, if PROFILING
1833  * -------------------------------------------------------------------------- */
1834
1835 static rtsBool
1836 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1837 {
1838 #if defined(PROFILING)
1839     // When we have +RTS -i0 and we're heap profiling, do a census at
1840     // every GC.  This lets us get repeatable runs for debugging.
1841     if (performHeapProfile ||
1842         (RtsFlags.ProfFlags.profileInterval==0 &&
1843          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1844         GarbageCollect(GetRoots, rtsTrue);
1845         heapCensus();
1846         performHeapProfile = rtsFalse;
1847         return rtsTrue;  // true <=> we already GC'd
1848     }
1849 #endif
1850     return rtsFalse;
1851 }
1852
1853 /* -----------------------------------------------------------------------------
1854  * Perform a garbage collection if necessary
1855  * -------------------------------------------------------------------------- */
1856
1857 static void
1858 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1859 {
1860     StgTSO *t;
1861 #ifdef SMP
1862     static volatile StgWord waiting_for_gc;
1863     rtsBool was_waiting;
1864     nat i;
1865 #endif
1866
1867 #ifdef SMP
1868     // In order to GC, there must be no threads running Haskell code.
1869     // Therefore, the GC thread needs to hold *all* the capabilities,
1870     // and release them after the GC has completed.  
1871     //
1872     // This seems to be the simplest way: previous attempts involved
1873     // making all the threads with capabilities give up their
1874     // capabilities and sleep except for the *last* one, which
1875     // actually did the GC.  But it's quite hard to arrange for all
1876     // the other tasks to sleep and stay asleep.
1877     //
1878         
1879     was_waiting = cas(&waiting_for_gc, 0, 1);
1880     if (was_waiting) {
1881         do {
1882             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1883             yieldCapability(&cap,task);
1884         } while (waiting_for_gc);
1885         return;
1886     }
1887
1888     for (i=0; i < n_capabilities; i++) {
1889         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1890         if (cap != &capabilities[i]) {
1891             Capability *pcap = &capabilities[i];
1892             // we better hope this task doesn't get migrated to
1893             // another Capability while we're waiting for this one.
1894             // It won't, because load balancing happens while we have
1895             // all the Capabilities, but even so it's a slightly
1896             // unsavoury invariant.
1897             task->cap = pcap;
1898             context_switch = 1;
1899             waitForReturnCapability(&pcap, task);
1900             if (pcap != &capabilities[i]) {
1901                 barf("scheduleDoGC: got the wrong capability");
1902             }
1903         }
1904     }
1905
1906     waiting_for_gc = rtsFalse;
1907 #endif
1908
1909     /* Kick any transactions which are invalid back to their
1910      * atomically frames.  When next scheduled they will try to
1911      * commit, this commit will fail and they will retry.
1912      */
1913     { 
1914         StgTSO *next;
1915
1916         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1917             if (t->what_next == ThreadRelocated) {
1918                 next = t->link;
1919             } else {
1920                 next = t->global_link;
1921                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1922                     if (!stmValidateNestOfTransactions (t -> trec)) {
1923                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1924                         
1925                         // strip the stack back to the
1926                         // ATOMICALLY_FRAME, aborting the (nested)
1927                         // transaction, and saving the stack of any
1928                         // partially-evaluated thunks on the heap.
1929                         raiseAsync_(cap, t, NULL, rtsTrue);
1930                         
1931 #ifdef REG_R1
1932                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1933 #endif
1934                     }
1935                 }
1936             }
1937         }
1938     }
1939     
1940     // so this happens periodically:
1941     scheduleCheckBlackHoles(cap);
1942     
1943     IF_DEBUG(scheduler, printAllThreads());
1944
1945     /* everybody back, start the GC.
1946      * Could do it in this thread, or signal a condition var
1947      * to do it in another thread.  Either way, we need to
1948      * broadcast on gc_pending_cond afterward.
1949      */
1950 #if defined(THREADED_RTS)
1951     IF_DEBUG(scheduler,sched_belch("doing GC"));
1952 #endif
1953     GarbageCollect(GetRoots, force_major);
1954     
1955 #if defined(SMP)
1956     // release our stash of capabilities.
1957     for (i = 0; i < n_capabilities; i++) {
1958         if (cap != &capabilities[i]) {
1959             task->cap = &capabilities[i];
1960             releaseCapability(&capabilities[i]);
1961         }
1962     }
1963     task->cap = cap;
1964 #endif
1965
1966 #if defined(GRAN)
1967     /* add a ContinueThread event to continue execution of current thread */
1968     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1969               ContinueThread,
1970               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1971     IF_GRAN_DEBUG(bq, 
1972                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1973                   G_EVENTQ(0);
1974                   G_CURR_THREADQ(0));
1975 #endif /* GRAN */
1976 }
1977
1978 /* ---------------------------------------------------------------------------
1979  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1980  * used by Control.Concurrent for error checking.
1981  * ------------------------------------------------------------------------- */
1982  
1983 StgBool
1984 rtsSupportsBoundThreads(void)
1985 {
1986 #if defined(THREADED_RTS)
1987   return rtsTrue;
1988 #else
1989   return rtsFalse;
1990 #endif
1991 }
1992
1993 /* ---------------------------------------------------------------------------
1994  * isThreadBound(tso): check whether tso is bound to an OS thread.
1995  * ------------------------------------------------------------------------- */
1996  
1997 StgBool
1998 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
1999 {
2000 #if defined(THREADED_RTS)
2001   return (tso->bound != NULL);
2002 #endif
2003   return rtsFalse;
2004 }
2005
2006 /* ---------------------------------------------------------------------------
2007  * Singleton fork(). Do not copy any running threads.
2008  * ------------------------------------------------------------------------- */
2009
2010 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2011 #define FORKPROCESS_PRIMOP_SUPPORTED
2012 #endif
2013
2014 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2015 static void 
2016 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2017 #endif
2018 StgInt
2019 forkProcess(HsStablePtr *entry
2020 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2021             STG_UNUSED
2022 #endif
2023            )
2024 {
2025 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2026     Task *task;
2027     pid_t pid;
2028     StgTSO* t,*next;
2029     Capability *cap;
2030     
2031     IF_DEBUG(scheduler,sched_belch("forking!"));
2032     
2033     // ToDo: for SMP, we should probably acquire *all* the capabilities
2034     cap = rts_lock();
2035     
2036     pid = fork();
2037     
2038     if (pid) { // parent
2039         
2040         // just return the pid
2041         rts_unlock(cap);
2042         return pid;
2043         
2044     } else { // child
2045         
2046         // delete all threads
2047         cap->run_queue_hd = END_TSO_QUEUE;
2048         cap->run_queue_tl = END_TSO_QUEUE;
2049         
2050         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2051             next = t->link;
2052             
2053             // don't allow threads to catch the ThreadKilled exception
2054             deleteThreadImmediately(cap,t);
2055         }
2056         
2057         // wipe the task list
2058         ACQUIRE_LOCK(&sched_mutex);
2059         for (task = all_tasks; task != NULL; task=task->all_link) {
2060             if (task != cap->running_task) discardTask(task);
2061         }
2062         RELEASE_LOCK(&sched_mutex);
2063
2064 #if defined(THREADED_RTS)
2065         // wipe our spare workers list.
2066         cap->spare_workers = NULL;
2067 #endif
2068
2069         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2070         rts_checkSchedStatus("forkProcess",cap);
2071         
2072         rts_unlock(cap);
2073         hs_exit();                      // clean up and exit
2074         stg_exit(EXIT_SUCCESS);
2075     }
2076 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2077     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2078     return -1;
2079 #endif
2080 }
2081
2082 /* ---------------------------------------------------------------------------
2083  * Delete the threads on the run queue of the current capability.
2084  * ------------------------------------------------------------------------- */
2085    
2086 static void
2087 deleteRunQueue (Capability *cap)
2088 {
2089     StgTSO *t, *next;
2090     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2091         ASSERT(t->what_next != ThreadRelocated);
2092         next = t->link;
2093         deleteThread(cap, t);
2094     }
2095 }
2096
2097 /* startThread and  insertThread are now in GranSim.c -- HWL */
2098
2099
2100 /* -----------------------------------------------------------------------------
2101    Managing the suspended_ccalling_tasks list.
2102    Locks required: sched_mutex
2103    -------------------------------------------------------------------------- */
2104
2105 STATIC_INLINE void
2106 suspendTask (Capability *cap, Task *task)
2107 {
2108     ASSERT(task->next == NULL && task->prev == NULL);
2109     task->next = cap->suspended_ccalling_tasks;
2110     task->prev = NULL;
2111     if (cap->suspended_ccalling_tasks) {
2112         cap->suspended_ccalling_tasks->prev = task;
2113     }
2114     cap->suspended_ccalling_tasks = task;
2115 }
2116
2117 STATIC_INLINE void
2118 recoverSuspendedTask (Capability *cap, Task *task)
2119 {
2120     if (task->prev) {
2121         task->prev->next = task->next;
2122     } else {
2123         ASSERT(cap->suspended_ccalling_tasks == task);
2124         cap->suspended_ccalling_tasks = task->next;
2125     }
2126     if (task->next) {
2127         task->next->prev = task->prev;
2128     }
2129     task->next = task->prev = NULL;
2130 }
2131
2132 /* ---------------------------------------------------------------------------
2133  * Suspending & resuming Haskell threads.
2134  * 
2135  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2136  * its capability before calling the C function.  This allows another
2137  * task to pick up the capability and carry on running Haskell
2138  * threads.  It also means that if the C call blocks, it won't lock
2139  * the whole system.
2140  *
2141  * The Haskell thread making the C call is put to sleep for the
2142  * duration of the call, on the susepended_ccalling_threads queue.  We
2143  * give out a token to the task, which it can use to resume the thread
2144  * on return from the C function.
2145  * ------------------------------------------------------------------------- */
2146    
2147 void *
2148 suspendThread (StgRegTable *reg)
2149 {
2150   Capability *cap;
2151   int saved_errno = errno;
2152   StgTSO *tso;
2153   Task *task;
2154
2155   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2156    */
2157   cap = regTableToCapability(reg);
2158
2159   task = cap->running_task;
2160   tso = cap->r.rCurrentTSO;
2161
2162   IF_DEBUG(scheduler,
2163            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2164
2165   // XXX this might not be necessary --SDM
2166   tso->what_next = ThreadRunGHC;
2167
2168   threadPaused(tso);
2169
2170   if(tso->blocked_exceptions == NULL)  {
2171       tso->why_blocked = BlockedOnCCall;
2172       tso->blocked_exceptions = END_TSO_QUEUE;
2173   } else {
2174       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2175   }
2176
2177   // Hand back capability
2178   task->suspended_tso = tso;
2179
2180   ACQUIRE_LOCK(&cap->lock);
2181
2182   suspendTask(cap,task);
2183   cap->in_haskell = rtsFalse;
2184   releaseCapability_(cap);
2185   
2186   RELEASE_LOCK(&cap->lock);
2187
2188 #if defined(THREADED_RTS)
2189   /* Preparing to leave the RTS, so ensure there's a native thread/task
2190      waiting to take over.
2191   */
2192   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2193 #endif
2194
2195   errno = saved_errno;
2196   return task;
2197 }
2198
2199 StgRegTable *
2200 resumeThread (void *task_)
2201 {
2202     StgTSO *tso;
2203     Capability *cap;
2204     int saved_errno = errno;
2205     Task *task = task_;
2206
2207     cap = task->cap;
2208     // Wait for permission to re-enter the RTS with the result.
2209     waitForReturnCapability(&cap,task);
2210     // we might be on a different capability now... but if so, our
2211     // entry on the suspended_ccalling_tasks list will also have been
2212     // migrated.
2213
2214     // Remove the thread from the suspended list
2215     recoverSuspendedTask(cap,task);
2216
2217     tso = task->suspended_tso;
2218     task->suspended_tso = NULL;
2219     tso->link = END_TSO_QUEUE;
2220     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2221     
2222     if (tso->why_blocked == BlockedOnCCall) {
2223         awakenBlockedQueue(cap,tso->blocked_exceptions);
2224         tso->blocked_exceptions = NULL;
2225     }
2226     
2227     /* Reset blocking status */
2228     tso->why_blocked  = NotBlocked;
2229     
2230     cap->r.rCurrentTSO = tso;
2231     cap->in_haskell = rtsTrue;
2232     errno = saved_errno;
2233
2234     return &cap->r;
2235 }
2236
2237 /* ---------------------------------------------------------------------------
2238  * Comparing Thread ids.
2239  *
2240  * This is used from STG land in the implementation of the
2241  * instances of Eq/Ord for ThreadIds.
2242  * ------------------------------------------------------------------------ */
2243
2244 int
2245 cmp_thread(StgPtr tso1, StgPtr tso2) 
2246
2247   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2248   StgThreadID id2 = ((StgTSO *)tso2)->id;
2249  
2250   if (id1 < id2) return (-1);
2251   if (id1 > id2) return 1;
2252   return 0;
2253 }
2254
2255 /* ---------------------------------------------------------------------------
2256  * Fetching the ThreadID from an StgTSO.
2257  *
2258  * This is used in the implementation of Show for ThreadIds.
2259  * ------------------------------------------------------------------------ */
2260 int
2261 rts_getThreadId(StgPtr tso) 
2262 {
2263   return ((StgTSO *)tso)->id;
2264 }
2265
2266 #ifdef DEBUG
2267 void
2268 labelThread(StgPtr tso, char *label)
2269 {
2270   int len;
2271   void *buf;
2272
2273   /* Caveat: Once set, you can only set the thread name to "" */
2274   len = strlen(label)+1;
2275   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2276   strncpy(buf,label,len);
2277   /* Update will free the old memory for us */
2278   updateThreadLabel(((StgTSO *)tso)->id,buf);
2279 }
2280 #endif /* DEBUG */
2281
2282 /* ---------------------------------------------------------------------------
2283    Create a new thread.
2284
2285    The new thread starts with the given stack size.  Before the
2286    scheduler can run, however, this thread needs to have a closure
2287    (and possibly some arguments) pushed on its stack.  See
2288    pushClosure() in Schedule.h.
2289
2290    createGenThread() and createIOThread() (in SchedAPI.h) are
2291    convenient packaged versions of this function.
2292
2293    currently pri (priority) is only used in a GRAN setup -- HWL
2294    ------------------------------------------------------------------------ */
2295 #if defined(GRAN)
2296 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2297 StgTSO *
2298 createThread(nat size, StgInt pri)
2299 #else
2300 StgTSO *
2301 createThread(Capability *cap, nat size)
2302 #endif
2303 {
2304     StgTSO *tso;
2305     nat stack_size;
2306
2307     /* sched_mutex is *not* required */
2308
2309     /* First check whether we should create a thread at all */
2310 #if defined(PARALLEL_HASKELL)
2311     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2312     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2313         threadsIgnored++;
2314         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2315                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2316         return END_TSO_QUEUE;
2317     }
2318     threadsCreated++;
2319 #endif
2320
2321 #if defined(GRAN)
2322     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2323 #endif
2324
2325     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2326
2327     /* catch ridiculously small stack sizes */
2328     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2329         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2330     }
2331
2332     stack_size = size - TSO_STRUCT_SIZEW;
2333     
2334     tso = (StgTSO *)allocateLocal(cap, size);
2335     TICK_ALLOC_TSO(stack_size, 0);
2336
2337     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2338 #if defined(GRAN)
2339     SET_GRAN_HDR(tso, ThisPE);
2340 #endif
2341
2342     // Always start with the compiled code evaluator
2343     tso->what_next = ThreadRunGHC;
2344
2345     tso->why_blocked  = NotBlocked;
2346     tso->blocked_exceptions = NULL;
2347     
2348     tso->saved_errno = 0;
2349     tso->bound = NULL;
2350     
2351     tso->stack_size     = stack_size;
2352     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2353                           - TSO_STRUCT_SIZEW;
2354     tso->sp             = (P_)&(tso->stack) + stack_size;
2355
2356     tso->trec = NO_TREC;
2357     
2358 #ifdef PROFILING
2359     tso->prof.CCCS = CCS_MAIN;
2360 #endif
2361     
2362   /* put a stop frame on the stack */
2363     tso->sp -= sizeofW(StgStopFrame);
2364     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2365     tso->link = END_TSO_QUEUE;
2366     
2367   // ToDo: check this
2368 #if defined(GRAN)
2369     /* uses more flexible routine in GranSim */
2370     insertThread(tso, CurrentProc);
2371 #else
2372     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2373      * from its creation
2374      */
2375 #endif
2376     
2377 #if defined(GRAN) 
2378     if (RtsFlags.GranFlags.GranSimStats.Full) 
2379         DumpGranEvent(GR_START,tso);
2380 #elif defined(PARALLEL_HASKELL)
2381     if (RtsFlags.ParFlags.ParStats.Full) 
2382         DumpGranEvent(GR_STARTQ,tso);
2383     /* HACk to avoid SCHEDULE 
2384        LastTSO = tso; */
2385 #endif
2386     
2387     /* Link the new thread on the global thread list.
2388      */
2389     ACQUIRE_LOCK(&sched_mutex);
2390     tso->id = next_thread_id++;  // while we have the mutex
2391     tso->global_link = all_threads;
2392     all_threads = tso;
2393     RELEASE_LOCK(&sched_mutex);
2394     
2395 #if defined(DIST)
2396     tso->dist.priority = MandatoryPriority; //by default that is...
2397 #endif
2398     
2399 #if defined(GRAN)
2400     tso->gran.pri = pri;
2401 # if defined(DEBUG)
2402     tso->gran.magic = TSO_MAGIC; // debugging only
2403 # endif
2404     tso->gran.sparkname   = 0;
2405     tso->gran.startedat   = CURRENT_TIME; 
2406     tso->gran.exported    = 0;
2407     tso->gran.basicblocks = 0;
2408     tso->gran.allocs      = 0;
2409     tso->gran.exectime    = 0;
2410     tso->gran.fetchtime   = 0;
2411     tso->gran.fetchcount  = 0;
2412     tso->gran.blocktime   = 0;
2413     tso->gran.blockcount  = 0;
2414     tso->gran.blockedat   = 0;
2415     tso->gran.globalsparks = 0;
2416     tso->gran.localsparks  = 0;
2417     if (RtsFlags.GranFlags.Light)
2418         tso->gran.clock  = Now; /* local clock */
2419     else
2420         tso->gran.clock  = 0;
2421     
2422     IF_DEBUG(gran,printTSO(tso));
2423 #elif defined(PARALLEL_HASKELL)
2424 # if defined(DEBUG)
2425     tso->par.magic = TSO_MAGIC; // debugging only
2426 # endif
2427     tso->par.sparkname   = 0;
2428     tso->par.startedat   = CURRENT_TIME; 
2429     tso->par.exported    = 0;
2430     tso->par.basicblocks = 0;
2431     tso->par.allocs      = 0;
2432     tso->par.exectime    = 0;
2433     tso->par.fetchtime   = 0;
2434     tso->par.fetchcount  = 0;
2435     tso->par.blocktime   = 0;
2436     tso->par.blockcount  = 0;
2437     tso->par.blockedat   = 0;
2438     tso->par.globalsparks = 0;
2439     tso->par.localsparks  = 0;
2440 #endif
2441     
2442 #if defined(GRAN)
2443     globalGranStats.tot_threads_created++;
2444     globalGranStats.threads_created_on_PE[CurrentProc]++;
2445     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2446     globalGranStats.tot_sq_probes++;
2447 #elif defined(PARALLEL_HASKELL)
2448     // collect parallel global statistics (currently done together with GC stats)
2449     if (RtsFlags.ParFlags.ParStats.Global &&
2450         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2451         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2452         globalParStats.tot_threads_created++;
2453     }
2454 #endif 
2455     
2456 #if defined(GRAN)
2457     IF_GRAN_DEBUG(pri,
2458                   sched_belch("==__ schedule: Created TSO %d (%p);",
2459                               CurrentProc, tso, tso->id));
2460 #elif defined(PARALLEL_HASKELL)
2461     IF_PAR_DEBUG(verbose,
2462                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2463                              (long)tso->id, tso, advisory_thread_count));
2464 #else
2465     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2466                                    (long)tso->id, (long)tso->stack_size));
2467 #endif    
2468     return tso;
2469 }
2470
2471 #if defined(PAR)
2472 /* RFP:
2473    all parallel thread creation calls should fall through the following routine.
2474 */
2475 StgTSO *
2476 createThreadFromSpark(rtsSpark spark) 
2477 { StgTSO *tso;
2478   ASSERT(spark != (rtsSpark)NULL);
2479 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2480   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2481   { threadsIgnored++;
2482     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2483           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2484     return END_TSO_QUEUE;
2485   }
2486   else
2487   { threadsCreated++;
2488     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2489     if (tso==END_TSO_QUEUE)     
2490       barf("createSparkThread: Cannot create TSO");
2491 #if defined(DIST)
2492     tso->priority = AdvisoryPriority;
2493 #endif
2494     pushClosure(tso,spark);
2495     addToRunQueue(tso);
2496     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2497   }
2498   return tso;
2499 }
2500 #endif
2501
2502 /*
2503   Turn a spark into a thread.
2504   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2505 */
2506 #if 0
2507 StgTSO *
2508 activateSpark (rtsSpark spark) 
2509 {
2510   StgTSO *tso;
2511
2512   tso = createSparkThread(spark);
2513   if (RtsFlags.ParFlags.ParStats.Full) {   
2514     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2515       IF_PAR_DEBUG(verbose,
2516                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2517                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2518   }
2519   // ToDo: fwd info on local/global spark to thread -- HWL
2520   // tso->gran.exported =  spark->exported;
2521   // tso->gran.locked =   !spark->global;
2522   // tso->gran.sparkname = spark->name;
2523
2524   return tso;
2525 }
2526 #endif
2527
2528 /* ---------------------------------------------------------------------------
2529  * scheduleThread()
2530  *
2531  * scheduleThread puts a thread on the end  of the runnable queue.
2532  * This will usually be done immediately after a thread is created.
2533  * The caller of scheduleThread must create the thread using e.g.
2534  * createThread and push an appropriate closure
2535  * on this thread's stack before the scheduler is invoked.
2536  * ------------------------------------------------------------------------ */
2537
2538 void
2539 scheduleThread(Capability *cap, StgTSO *tso)
2540 {
2541     // The thread goes at the *end* of the run-queue, to avoid possible
2542     // starvation of any threads already on the queue.
2543     appendToRunQueue(cap,tso);
2544 }
2545
2546 Capability *
2547 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2548 {
2549     Task *task;
2550
2551     // We already created/initialised the Task
2552     task = cap->running_task;
2553
2554     // This TSO is now a bound thread; make the Task and TSO
2555     // point to each other.
2556     tso->bound = task;
2557
2558     task->tso = tso;
2559     task->ret = ret;
2560     task->stat = NoStatus;
2561
2562     appendToRunQueue(cap,tso);
2563
2564     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2565
2566 #if defined(GRAN)
2567     /* GranSim specific init */
2568     CurrentTSO = m->tso;                // the TSO to run
2569     procStatus[MainProc] = Busy;        // status of main PE
2570     CurrentProc = MainProc;             // PE to run it on
2571 #endif
2572
2573     cap = schedule(cap,task);
2574
2575     ASSERT(task->stat != NoStatus);
2576     ASSERT_CAPABILITY_INVARIANTS(cap,task);
2577
2578     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2579     return cap;
2580 }
2581
2582 /* ----------------------------------------------------------------------------
2583  * Starting Tasks
2584  * ------------------------------------------------------------------------- */
2585
2586 #if defined(THREADED_RTS)
2587 void
2588 workerStart(Task *task)
2589 {
2590     Capability *cap;
2591
2592     // See startWorkerTask().
2593     ACQUIRE_LOCK(&task->lock);
2594     cap = task->cap;
2595     RELEASE_LOCK(&task->lock);
2596
2597     // set the thread-local pointer to the Task:
2598     taskEnter(task);
2599
2600     // schedule() runs without a lock.
2601     cap = schedule(cap,task);
2602
2603     // On exit from schedule(), we have a Capability.
2604     releaseCapability(cap);
2605     taskStop(task);
2606 }
2607 #endif
2608
2609 /* ---------------------------------------------------------------------------
2610  * initScheduler()
2611  *
2612  * Initialise the scheduler.  This resets all the queues - if the
2613  * queues contained any threads, they'll be garbage collected at the
2614  * next pass.
2615  *
2616  * ------------------------------------------------------------------------ */
2617
2618 void 
2619 initScheduler(void)
2620 {
2621 #if defined(GRAN)
2622   nat i;
2623   for (i=0; i<=MAX_PROC; i++) {
2624     run_queue_hds[i]      = END_TSO_QUEUE;
2625     run_queue_tls[i]      = END_TSO_QUEUE;
2626     blocked_queue_hds[i]  = END_TSO_QUEUE;
2627     blocked_queue_tls[i]  = END_TSO_QUEUE;
2628     ccalling_threadss[i]  = END_TSO_QUEUE;
2629     blackhole_queue[i]    = END_TSO_QUEUE;
2630     sleeping_queue        = END_TSO_QUEUE;
2631   }
2632 #elif !defined(THREADED_RTS)
2633   blocked_queue_hd  = END_TSO_QUEUE;
2634   blocked_queue_tl  = END_TSO_QUEUE;
2635   sleeping_queue    = END_TSO_QUEUE;
2636 #endif
2637
2638   blackhole_queue   = END_TSO_QUEUE;
2639   all_threads       = END_TSO_QUEUE;
2640
2641   context_switch = 0;
2642   interrupted    = 0;
2643
2644   RtsFlags.ConcFlags.ctxtSwitchTicks =
2645       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2646       
2647 #if defined(THREADED_RTS)
2648   /* Initialise the mutex and condition variables used by
2649    * the scheduler. */
2650   initMutex(&sched_mutex);
2651 #endif
2652   
2653   ACQUIRE_LOCK(&sched_mutex);
2654
2655   /* A capability holds the state a native thread needs in
2656    * order to execute STG code. At least one capability is
2657    * floating around (only SMP builds have more than one).
2658    */
2659   initCapabilities();
2660
2661   initTaskManager();
2662
2663 #if defined(SMP)
2664   /*
2665    * Eagerly start one worker to run each Capability, except for
2666    * Capability 0.  The idea is that we're probably going to start a
2667    * bound thread on Capability 0 pretty soon, so we don't want a
2668    * worker task hogging it.
2669    */
2670   { 
2671       nat i;
2672       Capability *cap;
2673       for (i = 1; i < n_capabilities; i++) {
2674           cap = &capabilities[i];
2675           ACQUIRE_LOCK(&cap->lock);
2676           startWorkerTask(cap, workerStart);
2677           RELEASE_LOCK(&cap->lock);
2678       }
2679   }
2680 #endif
2681
2682 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2683   initSparkPools();
2684 #endif
2685
2686   RELEASE_LOCK(&sched_mutex);
2687 }
2688
2689 void
2690 exitScheduler( void )
2691 {
2692     interrupted = rtsTrue;
2693     shutting_down_scheduler = rtsTrue;
2694
2695 #if defined(THREADED_RTS)
2696     { 
2697         Task *task;
2698         nat i;
2699         
2700         ACQUIRE_LOCK(&sched_mutex);
2701         task = newBoundTask();
2702         RELEASE_LOCK(&sched_mutex);
2703
2704         for (i = 0; i < n_capabilities; i++) {
2705             shutdownCapability(&capabilities[i], task);
2706         }
2707         boundTaskExiting(task);
2708         stopTaskManager();
2709     }
2710 #endif
2711 }
2712
2713 /* ---------------------------------------------------------------------------
2714    Where are the roots that we know about?
2715
2716         - all the threads on the runnable queue
2717         - all the threads on the blocked queue
2718         - all the threads on the sleeping queue
2719         - all the thread currently executing a _ccall_GC
2720         - all the "main threads"
2721      
2722    ------------------------------------------------------------------------ */
2723
2724 /* This has to be protected either by the scheduler monitor, or by the
2725         garbage collection monitor (probably the latter).
2726         KH @ 25/10/99
2727 */
2728
2729 void
2730 GetRoots( evac_fn evac )
2731 {
2732     nat i;
2733     Capability *cap;
2734     Task *task;
2735
2736 #if defined(GRAN)
2737     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2738         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2739             evac((StgClosure **)&run_queue_hds[i]);
2740         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2741             evac((StgClosure **)&run_queue_tls[i]);
2742         
2743         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2744             evac((StgClosure **)&blocked_queue_hds[i]);
2745         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2746             evac((StgClosure **)&blocked_queue_tls[i]);
2747         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2748             evac((StgClosure **)&ccalling_threads[i]);
2749     }
2750
2751     markEventQueue();
2752
2753 #else /* !GRAN */
2754
2755     for (i = 0; i < n_capabilities; i++) {
2756         cap = &capabilities[i];
2757         evac((StgClosure **)&cap->run_queue_hd);
2758         evac((StgClosure **)&cap->run_queue_tl);
2759         
2760         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2761              task=task->next) {
2762             evac((StgClosure **)&task->suspended_tso);
2763         }
2764     }
2765     
2766 #if !defined(THREADED_RTS)
2767     evac((StgClosure **)&blocked_queue_hd);
2768     evac((StgClosure **)&blocked_queue_tl);
2769     evac((StgClosure **)&sleeping_queue);
2770 #endif 
2771 #endif
2772
2773     evac((StgClosure **)&blackhole_queue);
2774
2775 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2776     markSparkQueue(evac);
2777 #endif
2778     
2779 #if defined(RTS_USER_SIGNALS)
2780     // mark the signal handlers (signals should be already blocked)
2781     markSignalHandlers(evac);
2782 #endif
2783 }
2784
2785 /* -----------------------------------------------------------------------------
2786    performGC
2787
2788    This is the interface to the garbage collector from Haskell land.
2789    We provide this so that external C code can allocate and garbage
2790    collect when called from Haskell via _ccall_GC.
2791
2792    It might be useful to provide an interface whereby the programmer
2793    can specify more roots (ToDo).
2794    
2795    This needs to be protected by the GC condition variable above.  KH.
2796    -------------------------------------------------------------------------- */
2797
2798 static void (*extra_roots)(evac_fn);
2799
2800 void
2801 performGC(void)
2802 {
2803 #ifdef THREADED_RTS
2804     // ToDo: we have to grab all the capabilities here.
2805     errorBelch("performGC not supported in threaded RTS (yet)");
2806     stg_exit(EXIT_FAILURE);
2807 #endif
2808     /* Obligated to hold this lock upon entry */
2809     GarbageCollect(GetRoots,rtsFalse);
2810 }
2811
2812 void
2813 performMajorGC(void)
2814 {
2815 #ifdef THREADED_RTS
2816     errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2817     stg_exit(EXIT_FAILURE);
2818 #endif
2819     GarbageCollect(GetRoots,rtsTrue);
2820 }
2821
2822 static void
2823 AllRoots(evac_fn evac)
2824 {
2825     GetRoots(evac);             // the scheduler's roots
2826     extra_roots(evac);          // the user's roots
2827 }
2828
2829 void
2830 performGCWithRoots(void (*get_roots)(evac_fn))
2831 {
2832 #ifdef THREADED_RTS
2833     errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2834     stg_exit(EXIT_FAILURE);
2835 #endif
2836     extra_roots = get_roots;
2837     GarbageCollect(AllRoots,rtsFalse);
2838 }
2839
2840 /* -----------------------------------------------------------------------------
2841    Stack overflow
2842
2843    If the thread has reached its maximum stack size, then raise the
2844    StackOverflow exception in the offending thread.  Otherwise
2845    relocate the TSO into a larger chunk of memory and adjust its stack
2846    size appropriately.
2847    -------------------------------------------------------------------------- */
2848
2849 static StgTSO *
2850 threadStackOverflow(Capability *cap, StgTSO *tso)
2851 {
2852   nat new_stack_size, stack_words;
2853   lnat new_tso_size;
2854   StgPtr new_sp;
2855   StgTSO *dest;
2856
2857   IF_DEBUG(sanity,checkTSO(tso));
2858   if (tso->stack_size >= tso->max_stack_size) {
2859
2860     IF_DEBUG(gc,
2861              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2862                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2863              /* If we're debugging, just print out the top of the stack */
2864              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2865                                               tso->sp+64)));
2866
2867     /* Send this thread the StackOverflow exception */
2868     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2869     return tso;
2870   }
2871
2872   /* Try to double the current stack size.  If that takes us over the
2873    * maximum stack size for this thread, then use the maximum instead.
2874    * Finally round up so the TSO ends up as a whole number of blocks.
2875    */
2876   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2877   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2878                                        TSO_STRUCT_SIZE)/sizeof(W_);
2879   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2880   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2881
2882   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2883
2884   dest = (StgTSO *)allocate(new_tso_size);
2885   TICK_ALLOC_TSO(new_stack_size,0);
2886
2887   /* copy the TSO block and the old stack into the new area */
2888   memcpy(dest,tso,TSO_STRUCT_SIZE);
2889   stack_words = tso->stack + tso->stack_size - tso->sp;
2890   new_sp = (P_)dest + new_tso_size - stack_words;
2891   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2892
2893   /* relocate the stack pointers... */
2894   dest->sp         = new_sp;
2895   dest->stack_size = new_stack_size;
2896         
2897   /* Mark the old TSO as relocated.  We have to check for relocated
2898    * TSOs in the garbage collector and any primops that deal with TSOs.
2899    *
2900    * It's important to set the sp value to just beyond the end
2901    * of the stack, so we don't attempt to scavenge any part of the
2902    * dead TSO's stack.
2903    */
2904   tso->what_next = ThreadRelocated;
2905   tso->link = dest;
2906   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2907   tso->why_blocked = NotBlocked;
2908
2909   IF_PAR_DEBUG(verbose,
2910                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2911                      tso->id, tso, tso->stack_size);
2912                /* If we're debugging, just print out the top of the stack */
2913                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2914                                                 tso->sp+64)));
2915   
2916   IF_DEBUG(sanity,checkTSO(tso));
2917 #if 0
2918   IF_DEBUG(scheduler,printTSO(dest));
2919 #endif
2920
2921   return dest;
2922 }
2923
2924 /* ---------------------------------------------------------------------------
2925    Wake up a queue that was blocked on some resource.
2926    ------------------------------------------------------------------------ */
2927
2928 #if defined(GRAN)
2929 STATIC_INLINE void
2930 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2931 {
2932 }
2933 #elif defined(PARALLEL_HASKELL)
2934 STATIC_INLINE void
2935 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2936 {
2937   /* write RESUME events to log file and
2938      update blocked and fetch time (depending on type of the orig closure) */
2939   if (RtsFlags.ParFlags.ParStats.Full) {
2940     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2941                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2942                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2943     if (emptyRunQueue())
2944       emitSchedule = rtsTrue;
2945
2946     switch (get_itbl(node)->type) {
2947         case FETCH_ME_BQ:
2948           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2949           break;
2950         case RBH:
2951         case FETCH_ME:
2952         case BLACKHOLE_BQ:
2953           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2954           break;
2955 #ifdef DIST
2956         case MVAR:
2957           break;
2958 #endif    
2959         default:
2960           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2961         }
2962       }
2963 }
2964 #endif
2965
2966 #if defined(GRAN)
2967 StgBlockingQueueElement *
2968 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2969 {
2970     StgTSO *tso;
2971     PEs node_loc, tso_loc;
2972
2973     node_loc = where_is(node); // should be lifted out of loop
2974     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2975     tso_loc = where_is((StgClosure *)tso);
2976     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2977       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2978       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2979       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2980       // insertThread(tso, node_loc);
2981       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2982                 ResumeThread,
2983                 tso, node, (rtsSpark*)NULL);
2984       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2985       // len_local++;
2986       // len++;
2987     } else { // TSO is remote (actually should be FMBQ)
2988       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2989                                   RtsFlags.GranFlags.Costs.gunblocktime +
2990                                   RtsFlags.GranFlags.Costs.latency;
2991       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2992                 UnblockThread,
2993                 tso, node, (rtsSpark*)NULL);
2994       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2995       // len++;
2996     }
2997     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2998     IF_GRAN_DEBUG(bq,
2999                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3000                           (node_loc==tso_loc ? "Local" : "Global"), 
3001                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3002     tso->block_info.closure = NULL;
3003     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3004                              tso->id, tso));
3005 }
3006 #elif defined(PARALLEL_HASKELL)
3007 StgBlockingQueueElement *
3008 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3009 {
3010     StgBlockingQueueElement *next;
3011
3012     switch (get_itbl(bqe)->type) {
3013     case TSO:
3014       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3015       /* if it's a TSO just push it onto the run_queue */
3016       next = bqe->link;
3017       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3018       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3019       threadRunnable();
3020       unblockCount(bqe, node);
3021       /* reset blocking status after dumping event */
3022       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3023       break;
3024
3025     case BLOCKED_FETCH:
3026       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3027       next = bqe->link;
3028       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3029       PendingFetches = (StgBlockedFetch *)bqe;
3030       break;
3031
3032 # if defined(DEBUG)
3033       /* can ignore this case in a non-debugging setup; 
3034          see comments on RBHSave closures above */
3035     case CONSTR:
3036       /* check that the closure is an RBHSave closure */
3037       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3038              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3039              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3040       break;
3041
3042     default:
3043       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3044            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3045            (StgClosure *)bqe);
3046 # endif
3047     }
3048   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3049   return next;
3050 }
3051 #endif
3052
3053 StgTSO *
3054 unblockOne(Capability *cap, StgTSO *tso)
3055 {
3056   StgTSO *next;
3057
3058   ASSERT(get_itbl(tso)->type == TSO);
3059   ASSERT(tso->why_blocked != NotBlocked);
3060   tso->why_blocked = NotBlocked;
3061   next = tso->link;
3062   tso->link = END_TSO_QUEUE;
3063
3064   // We might have just migrated this TSO to our Capability:
3065   if (tso->bound) {
3066       tso->bound->cap = cap;
3067   }
3068
3069   appendToRunQueue(cap,tso);
3070
3071   // we're holding a newly woken thread, make sure we context switch
3072   // quickly so we can migrate it if necessary.
3073   context_switch = 1;
3074   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3075   return next;
3076 }
3077
3078
3079 #if defined(GRAN)
3080 void 
3081 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3082 {
3083   StgBlockingQueueElement *bqe;
3084   PEs node_loc;
3085   nat len = 0; 
3086
3087   IF_GRAN_DEBUG(bq, 
3088                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3089                       node, CurrentProc, CurrentTime[CurrentProc], 
3090                       CurrentTSO->id, CurrentTSO));
3091
3092   node_loc = where_is(node);
3093
3094   ASSERT(q == END_BQ_QUEUE ||
3095          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3096          get_itbl(q)->type == CONSTR); // closure (type constructor)
3097   ASSERT(is_unique(node));
3098
3099   /* FAKE FETCH: magically copy the node to the tso's proc;
3100      no Fetch necessary because in reality the node should not have been 
3101      moved to the other PE in the first place
3102   */
3103   if (CurrentProc!=node_loc) {
3104     IF_GRAN_DEBUG(bq, 
3105                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3106                         node, node_loc, CurrentProc, CurrentTSO->id, 
3107                         // CurrentTSO, where_is(CurrentTSO),
3108                         node->header.gran.procs));
3109     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3110     IF_GRAN_DEBUG(bq, 
3111                   debugBelch("## new bitmask of node %p is %#x\n",
3112                         node, node->header.gran.procs));
3113     if (RtsFlags.GranFlags.GranSimStats.Global) {
3114       globalGranStats.tot_fake_fetches++;
3115     }
3116   }
3117
3118   bqe = q;
3119   // ToDo: check: ASSERT(CurrentProc==node_loc);
3120   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3121     //next = bqe->link;
3122     /* 
3123        bqe points to the current element in the queue
3124        next points to the next element in the queue
3125     */
3126     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3127     //tso_loc = where_is(tso);
3128     len++;
3129     bqe = unblockOne(bqe, node);
3130   }
3131
3132   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3133      the closure to make room for the anchor of the BQ */
3134   if (bqe!=END_BQ_QUEUE) {
3135     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3136     /*
3137     ASSERT((info_ptr==&RBH_Save_0_info) ||
3138            (info_ptr==&RBH_Save_1_info) ||
3139            (info_ptr==&RBH_Save_2_info));
3140     */
3141     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3142     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3143     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3144
3145     IF_GRAN_DEBUG(bq,
3146                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3147                         node, info_type(node)));
3148   }
3149
3150   /* statistics gathering */
3151   if (RtsFlags.GranFlags.GranSimStats.Global) {
3152     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3153     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3154     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3155     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3156   }
3157   IF_GRAN_DEBUG(bq,
3158                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3159                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3160 }
3161 #elif defined(PARALLEL_HASKELL)
3162 void 
3163 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3164 {
3165   StgBlockingQueueElement *bqe;
3166
3167   IF_PAR_DEBUG(verbose, 
3168                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3169                      node, mytid));
3170 #ifdef DIST  
3171   //RFP
3172   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3173     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3174     return;
3175   }
3176 #endif
3177   
3178   ASSERT(q == END_BQ_QUEUE ||
3179          get_itbl(q)->type == TSO ||           
3180          get_itbl(q)->type == BLOCKED_FETCH || 
3181          get_itbl(q)->type == CONSTR); 
3182
3183   bqe = q;
3184   while (get_itbl(bqe)->type==TSO || 
3185          get_itbl(bqe)->type==BLOCKED_FETCH) {
3186     bqe = unblockOne(bqe, node);
3187   }
3188 }
3189
3190 #else   /* !GRAN && !PARALLEL_HASKELL */
3191
3192 void
3193 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3194 {
3195     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3196                              // Exception.cmm
3197     while (tso != END_TSO_QUEUE) {
3198         tso = unblockOne(cap,tso);
3199     }
3200 }
3201 #endif
3202
3203 /* ---------------------------------------------------------------------------
3204    Interrupt execution
3205    - usually called inside a signal handler so it mustn't do anything fancy.   
3206    ------------------------------------------------------------------------ */
3207
3208 void
3209 interruptStgRts(void)
3210 {
3211     interrupted    = 1;
3212     context_switch = 1;
3213 #if defined(THREADED_RTS)
3214     prodAllCapabilities();
3215 #endif
3216 }
3217
3218 /* -----------------------------------------------------------------------------
3219    Unblock a thread
3220
3221    This is for use when we raise an exception in another thread, which
3222    may be blocked.
3223    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3224    -------------------------------------------------------------------------- */
3225
3226 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3227 /*
3228   NB: only the type of the blocking queue is different in GranSim and GUM
3229       the operations on the queue-elements are the same
3230       long live polymorphism!
3231
3232   Locks: sched_mutex is held upon entry and exit.
3233
3234 */
3235 static void
3236 unblockThread(Capability *cap, StgTSO *tso)
3237 {
3238   StgBlockingQueueElement *t, **last;
3239
3240   switch (tso->why_blocked) {
3241
3242   case NotBlocked:
3243     return;  /* not blocked */
3244
3245   case BlockedOnSTM:
3246     // Be careful: nothing to do here!  We tell the scheduler that the thread
3247     // is runnable and we leave it to the stack-walking code to abort the 
3248     // transaction while unwinding the stack.  We should perhaps have a debugging
3249     // test to make sure that this really happens and that the 'zombie' transaction
3250     // does not get committed.
3251     goto done;
3252
3253   case BlockedOnMVar:
3254     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3255     {
3256       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3257       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3258
3259       last = (StgBlockingQueueElement **)&mvar->head;
3260       for (t = (StgBlockingQueueElement *)mvar->head; 
3261            t != END_BQ_QUEUE; 
3262            last = &t->link, last_tso = t, t = t->link) {
3263         if (t == (StgBlockingQueueElement *)tso) {
3264           *last = (StgBlockingQueueElement *)tso->link;
3265           if (mvar->tail == tso) {
3266             mvar->tail = (StgTSO *)last_tso;
3267           }
3268           goto done;
3269         }
3270       }
3271       barf("unblockThread (MVAR): TSO not found");
3272     }
3273
3274   case BlockedOnBlackHole:
3275     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3276     {
3277       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3278
3279       last = &bq->blocking_queue;
3280       for (t = bq->blocking_queue; 
3281            t != END_BQ_QUEUE; 
3282            last = &t->link, t = t->link) {
3283         if (t == (StgBlockingQueueElement *)tso) {
3284           *last = (StgBlockingQueueElement *)tso->link;
3285           goto done;
3286         }
3287       }
3288       barf("unblockThread (BLACKHOLE): TSO not found");
3289     }
3290
3291   case BlockedOnException:
3292     {
3293       StgTSO *target  = tso->block_info.tso;
3294
3295       ASSERT(get_itbl(target)->type == TSO);
3296
3297       if (target->what_next == ThreadRelocated) {
3298           target = target->link;
3299           ASSERT(get_itbl(target)->type == TSO);
3300       }
3301
3302       ASSERT(target->blocked_exceptions != NULL);
3303
3304       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3305       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3306            t != END_BQ_QUEUE; 
3307            last = &t->link, t = t->link) {
3308         ASSERT(get_itbl(t)->type == TSO);
3309         if (t == (StgBlockingQueueElement *)tso) {
3310           *last = (StgBlockingQueueElement *)tso->link;
3311           goto done;
3312         }
3313       }
3314       barf("unblockThread (Exception): TSO not found");
3315     }
3316
3317   case BlockedOnRead:
3318   case BlockedOnWrite:
3319 #if defined(mingw32_HOST_OS)
3320   case BlockedOnDoProc:
3321 #endif
3322     {
3323       /* take TSO off blocked_queue */
3324       StgBlockingQueueElement *prev = NULL;
3325       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3326            prev = t, t = t->link) {
3327         if (t == (StgBlockingQueueElement *)tso) {
3328           if (prev == NULL) {
3329             blocked_queue_hd = (StgTSO *)t->link;
3330             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3331               blocked_queue_tl = END_TSO_QUEUE;
3332             }
3333           } else {
3334             prev->link = t->link;
3335             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3336               blocked_queue_tl = (StgTSO *)prev;
3337             }
3338           }
3339 #if defined(mingw32_HOST_OS)
3340           /* (Cooperatively) signal that the worker thread should abort
3341            * the request.
3342            */
3343           abandonWorkRequest(tso->block_info.async_result->reqID);
3344 #endif
3345           goto done;
3346         }
3347       }
3348       barf("unblockThread (I/O): TSO not found");
3349     }
3350
3351   case BlockedOnDelay:
3352     {
3353       /* take TSO off sleeping_queue */
3354       StgBlockingQueueElement *prev = NULL;
3355       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3356            prev = t, t = t->link) {
3357         if (t == (StgBlockingQueueElement *)tso) {
3358           if (prev == NULL) {
3359             sleeping_queue = (StgTSO *)t->link;
3360           } else {
3361             prev->link = t->link;
3362           }
3363           goto done;
3364         }
3365       }
3366       barf("unblockThread (delay): TSO not found");
3367     }
3368
3369   default:
3370     barf("unblockThread");
3371   }
3372
3373  done:
3374   tso->link = END_TSO_QUEUE;
3375   tso->why_blocked = NotBlocked;
3376   tso->block_info.closure = NULL;
3377   pushOnRunQueue(cap,tso);
3378 }
3379 #else
3380 static void
3381 unblockThread(Capability *cap, StgTSO *tso)
3382 {
3383   StgTSO *t, **last;
3384   
3385   /* To avoid locking unnecessarily. */
3386   if (tso->why_blocked == NotBlocked) {
3387     return;
3388   }
3389
3390   switch (tso->why_blocked) {
3391
3392   case BlockedOnSTM:
3393     // Be careful: nothing to do here!  We tell the scheduler that the thread
3394     // is runnable and we leave it to the stack-walking code to abort the 
3395     // transaction while unwinding the stack.  We should perhaps have a debugging
3396     // test to make sure that this really happens and that the 'zombie' transaction
3397     // does not get committed.
3398     goto done;
3399
3400   case BlockedOnMVar:
3401     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3402     {
3403       StgTSO *last_tso = END_TSO_QUEUE;
3404       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3405
3406       last = &mvar->head;
3407       for (t = mvar->head; t != END_TSO_QUEUE; 
3408            last = &t->link, last_tso = t, t = t->link) {
3409         if (t == tso) {
3410           *last = tso->link;
3411           if (mvar->tail == tso) {
3412             mvar->tail = last_tso;
3413           }
3414           goto done;
3415         }
3416       }
3417       barf("unblockThread (MVAR): TSO not found");
3418     }
3419
3420   case BlockedOnBlackHole:
3421     {
3422       last = &blackhole_queue;
3423       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3424            last = &t->link, t = t->link) {
3425         if (t == tso) {
3426           *last = tso->link;
3427           goto done;
3428         }
3429       }
3430       barf("unblockThread (BLACKHOLE): TSO not found");
3431     }
3432
3433   case BlockedOnException:
3434     {
3435       StgTSO *target  = tso->block_info.tso;
3436
3437       ASSERT(get_itbl(target)->type == TSO);
3438
3439       while (target->what_next == ThreadRelocated) {
3440           target = target->link;
3441           ASSERT(get_itbl(target)->type == TSO);
3442       }
3443       
3444       ASSERT(target->blocked_exceptions != NULL);
3445
3446       last = &target->blocked_exceptions;
3447       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3448            last = &t->link, t = t->link) {
3449         ASSERT(get_itbl(t)->type == TSO);
3450         if (t == tso) {
3451           *last = tso->link;
3452           goto done;
3453         }
3454       }
3455       barf("unblockThread (Exception): TSO not found");
3456     }
3457
3458 #if !defined(THREADED_RTS)
3459   case BlockedOnRead:
3460   case BlockedOnWrite:
3461 #if defined(mingw32_HOST_OS)
3462   case BlockedOnDoProc:
3463 #endif
3464     {
3465       StgTSO *prev = NULL;
3466       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3467            prev = t, t = t->link) {
3468         if (t == tso) {
3469           if (prev == NULL) {
3470             blocked_queue_hd = t->link;
3471             if (blocked_queue_tl == t) {
3472               blocked_queue_tl = END_TSO_QUEUE;
3473             }
3474           } else {
3475             prev->link = t->link;
3476             if (blocked_queue_tl == t) {
3477               blocked_queue_tl = prev;
3478             }
3479           }
3480 #if defined(mingw32_HOST_OS)
3481           /* (Cooperatively) signal that the worker thread should abort
3482            * the request.
3483            */
3484           abandonWorkRequest(tso->block_info.async_result->reqID);
3485 #endif
3486           goto done;
3487         }
3488       }
3489       barf("unblockThread (I/O): TSO not found");
3490     }
3491
3492   case BlockedOnDelay:
3493     {
3494       StgTSO *prev = NULL;
3495       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3496            prev = t, t = t->link) {
3497         if (t == tso) {
3498           if (prev == NULL) {
3499             sleeping_queue = t->link;
3500           } else {
3501             prev->link = t->link;
3502           }
3503           goto done;
3504         }
3505       }
3506       barf("unblockThread (delay): TSO not found");
3507     }
3508 #endif
3509
3510   default:
3511     barf("unblockThread");
3512   }
3513
3514  done:
3515   tso->link = END_TSO_QUEUE;
3516   tso->why_blocked = NotBlocked;
3517   tso->block_info.closure = NULL;
3518   appendToRunQueue(cap,tso);
3519 }
3520 #endif
3521
3522 /* -----------------------------------------------------------------------------
3523  * checkBlackHoles()
3524  *
3525  * Check the blackhole_queue for threads that can be woken up.  We do
3526  * this periodically: before every GC, and whenever the run queue is
3527  * empty.
3528  *
3529  * An elegant solution might be to just wake up all the blocked
3530  * threads with awakenBlockedQueue occasionally: they'll go back to
3531  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3532  * doesn't give us a way to tell whether we've actually managed to
3533  * wake up any threads, so we would be busy-waiting.
3534  *
3535  * -------------------------------------------------------------------------- */
3536
3537 static rtsBool
3538 checkBlackHoles (Capability *cap)
3539 {
3540     StgTSO **prev, *t;
3541     rtsBool any_woke_up = rtsFalse;
3542     StgHalfWord type;
3543
3544     // blackhole_queue is global:
3545     ASSERT_LOCK_HELD(&sched_mutex);
3546
3547     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3548
3549     // ASSUMES: sched_mutex
3550     prev = &blackhole_queue;
3551     t = blackhole_queue;
3552     while (t != END_TSO_QUEUE) {
3553         ASSERT(t->why_blocked == BlockedOnBlackHole);
3554         type = get_itbl(t->block_info.closure)->type;
3555         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3556             IF_DEBUG(sanity,checkTSO(t));
3557             t = unblockOne(cap, t);
3558             // urk, the threads migrate to the current capability
3559             // here, but we'd like to keep them on the original one.
3560             *prev = t;
3561             any_woke_up = rtsTrue;
3562         } else {
3563             prev = &t->link;
3564             t = t->link;
3565         }
3566     }
3567
3568     return any_woke_up;
3569 }
3570
3571 /* -----------------------------------------------------------------------------
3572  * raiseAsync()
3573  *
3574  * The following function implements the magic for raising an
3575  * asynchronous exception in an existing thread.
3576  *
3577  * We first remove the thread from any queue on which it might be
3578  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3579  *
3580  * We strip the stack down to the innermost CATCH_FRAME, building
3581  * thunks in the heap for all the active computations, so they can 
3582  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3583  * an application of the handler to the exception, and push it on
3584  * the top of the stack.
3585  * 
3586  * How exactly do we save all the active computations?  We create an
3587  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3588  * AP_STACKs pushes everything from the corresponding update frame
3589  * upwards onto the stack.  (Actually, it pushes everything up to the
3590  * next update frame plus a pointer to the next AP_STACK object.
3591  * Entering the next AP_STACK object pushes more onto the stack until we
3592  * reach the last AP_STACK object - at which point the stack should look
3593  * exactly as it did when we killed the TSO and we can continue
3594  * execution by entering the closure on top of the stack.
3595  *
3596  * We can also kill a thread entirely - this happens if either (a) the 
3597  * exception passed to raiseAsync is NULL, or (b) there's no
3598  * CATCH_FRAME on the stack.  In either case, we strip the entire
3599  * stack and replace the thread with a zombie.
3600  *
3601  * ToDo: in SMP mode, this function is only safe if either (a) we hold
3602  * all the Capabilities (eg. in GC), or (b) we own the Capability that
3603  * the TSO is currently blocked on or on the run queue of.
3604  *
3605  * -------------------------------------------------------------------------- */
3606  
3607 void
3608 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3609 {
3610     raiseAsync_(cap, tso, exception, rtsFalse);
3611 }
3612
3613 static void
3614 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3615             rtsBool stop_at_atomically)
3616 {
3617     StgRetInfoTable *info;
3618     StgPtr sp;
3619   
3620     // Thread already dead?
3621     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3622         return;
3623     }
3624
3625     IF_DEBUG(scheduler, 
3626              sched_belch("raising exception in thread %ld.", (long)tso->id));
3627     
3628     // Remove it from any blocking queues
3629     unblockThread(cap,tso);
3630
3631     sp = tso->sp;
3632     
3633     // The stack freezing code assumes there's a closure pointer on
3634     // the top of the stack, so we have to arrange that this is the case...
3635     //
3636     if (sp[0] == (W_)&stg_enter_info) {
3637         sp++;
3638     } else {
3639         sp--;
3640         sp[0] = (W_)&stg_dummy_ret_closure;
3641     }
3642
3643     while (1) {
3644         nat i;
3645
3646         // 1. Let the top of the stack be the "current closure"
3647         //
3648         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3649         // CATCH_FRAME.
3650         //
3651         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3652         // current closure applied to the chunk of stack up to (but not
3653         // including) the update frame.  This closure becomes the "current
3654         // closure".  Go back to step 2.
3655         //
3656         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3657         // top of the stack applied to the exception.
3658         // 
3659         // 5. If it's a STOP_FRAME, then kill the thread.
3660         // 
3661         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3662         // transaction
3663        
3664         
3665         StgPtr frame;
3666         
3667         frame = sp + 1;
3668         info = get_ret_itbl((StgClosure *)frame);
3669         
3670         while (info->i.type != UPDATE_FRAME
3671                && (info->i.type != CATCH_FRAME || exception == NULL)
3672                && info->i.type != STOP_FRAME
3673                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3674         {
3675             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3676               // IF we find an ATOMICALLY_FRAME then we abort the
3677               // current transaction and propagate the exception.  In
3678               // this case (unlike ordinary exceptions) we do not care
3679               // whether the transaction is valid or not because its
3680               // possible validity cannot have caused the exception
3681               // and will not be visible after the abort.
3682               IF_DEBUG(stm,
3683                        debugBelch("Found atomically block delivering async exception\n"));
3684               stmAbortTransaction(tso -> trec);
3685               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3686             }
3687             frame += stack_frame_sizeW((StgClosure *)frame);
3688             info = get_ret_itbl((StgClosure *)frame);
3689         }
3690         
3691         switch (info->i.type) {
3692             
3693         case ATOMICALLY_FRAME:
3694             ASSERT(stop_at_atomically);
3695             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3696             stmCondemnTransaction(tso -> trec);
3697 #ifdef REG_R1
3698             tso->sp = frame;
3699 #else
3700             // R1 is not a register: the return convention for IO in
3701             // this case puts the return value on the stack, so we
3702             // need to set up the stack to return to the atomically
3703             // frame properly...
3704             tso->sp = frame - 2;
3705             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3706             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3707 #endif
3708             tso->what_next = ThreadRunGHC;
3709             return;
3710
3711         case CATCH_FRAME:
3712             // If we find a CATCH_FRAME, and we've got an exception to raise,
3713             // then build the THUNK raise(exception), and leave it on
3714             // top of the CATCH_FRAME ready to enter.
3715             //
3716         {
3717 #ifdef PROFILING
3718             StgCatchFrame *cf = (StgCatchFrame *)frame;
3719 #endif
3720             StgThunk *raise;
3721             
3722             // we've got an exception to raise, so let's pass it to the
3723             // handler in this frame.
3724             //
3725             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3726             TICK_ALLOC_SE_THK(1,0);
3727             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3728             raise->payload[0] = exception;
3729             
3730             // throw away the stack from Sp up to the CATCH_FRAME.
3731             //
3732             sp = frame - 1;
3733             
3734             /* Ensure that async excpetions are blocked now, so we don't get
3735              * a surprise exception before we get around to executing the
3736              * handler.
3737              */
3738             if (tso->blocked_exceptions == NULL) {
3739                 tso->blocked_exceptions = END_TSO_QUEUE;
3740             }
3741             
3742             /* Put the newly-built THUNK on top of the stack, ready to execute
3743              * when the thread restarts.
3744              */
3745             sp[0] = (W_)raise;
3746             sp[-1] = (W_)&stg_enter_info;
3747             tso->sp = sp-1;
3748             tso->what_next = ThreadRunGHC;
3749             IF_DEBUG(sanity, checkTSO(tso));
3750             return;
3751         }
3752         
3753         case UPDATE_FRAME:
3754         {
3755             StgAP_STACK * ap;
3756             nat words;
3757             
3758             // First build an AP_STACK consisting of the stack chunk above the
3759             // current update frame, with the top word on the stack as the
3760             // fun field.
3761             //
3762             words = frame - sp - 1;
3763             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3764             
3765             ap->size = words;
3766             ap->fun  = (StgClosure *)sp[0];
3767             sp++;
3768             for(i=0; i < (nat)words; ++i) {
3769                 ap->payload[i] = (StgClosure *)*sp++;
3770             }
3771             
3772             SET_HDR(ap,&stg_AP_STACK_info,
3773                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3774             TICK_ALLOC_UP_THK(words+1,0);
3775             
3776             IF_DEBUG(scheduler,
3777                      debugBelch("sched: Updating ");
3778                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3779                      debugBelch(" with ");
3780                      printObj((StgClosure *)ap);
3781                 );
3782
3783             // Replace the updatee with an indirection - happily
3784             // this will also wake up any threads currently
3785             // waiting on the result.
3786             //
3787             // Warning: if we're in a loop, more than one update frame on
3788             // the stack may point to the same object.  Be careful not to
3789             // overwrite an IND_OLDGEN in this case, because we'll screw
3790             // up the mutable lists.  To be on the safe side, don't
3791             // overwrite any kind of indirection at all.  See also
3792             // threadSqueezeStack in GC.c, where we have to make a similar
3793             // check.
3794             //
3795             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3796                 // revert the black hole
3797                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3798                                (StgClosure *)ap);
3799             }
3800             sp += sizeofW(StgUpdateFrame) - 1;
3801             sp[0] = (W_)ap; // push onto stack
3802             break;
3803         }
3804         
3805         case STOP_FRAME:
3806             // We've stripped the entire stack, the thread is now dead.
3807             tso->what_next = ThreadKilled;
3808             tso->sp = frame + sizeofW(StgStopFrame);
3809             return;
3810             
3811         default:
3812             barf("raiseAsync");
3813         }
3814     }
3815     barf("raiseAsync");
3816 }
3817
3818 /* -----------------------------------------------------------------------------
3819    Deleting threads
3820
3821    This is used for interruption (^C) and forking, and corresponds to
3822    raising an exception but without letting the thread catch the
3823    exception.
3824    -------------------------------------------------------------------------- */
3825
3826 static void 
3827 deleteThread (Capability *cap, StgTSO *tso)
3828 {
3829   if (tso->why_blocked != BlockedOnCCall &&
3830       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3831       raiseAsync(cap,tso,NULL);
3832   }
3833 }
3834
3835 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3836 static void 
3837 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3838 { // for forkProcess only:
3839   // delete thread without giving it a chance to catch the KillThread exception
3840
3841   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3842       return;
3843   }
3844
3845   if (tso->why_blocked != BlockedOnCCall &&
3846       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3847       unblockThread(cap,tso);
3848   }
3849
3850   tso->what_next = ThreadKilled;
3851 }
3852 #endif
3853
3854 /* -----------------------------------------------------------------------------
3855    raiseExceptionHelper
3856    
3857    This function is called by the raise# primitve, just so that we can
3858    move some of the tricky bits of raising an exception from C-- into
3859    C.  Who knows, it might be a useful re-useable thing here too.
3860    -------------------------------------------------------------------------- */
3861
3862 StgWord
3863 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3864 {
3865     Capability *cap = regTableToCapability(reg);
3866     StgThunk *raise_closure = NULL;
3867     StgPtr p, next;
3868     StgRetInfoTable *info;
3869     //
3870     // This closure represents the expression 'raise# E' where E
3871     // is the exception raise.  It is used to overwrite all the
3872     // thunks which are currently under evaluataion.
3873     //
3874
3875     //    
3876     // LDV profiling: stg_raise_info has THUNK as its closure
3877     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3878     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3879     // 1 does not cause any problem unless profiling is performed.
3880     // However, when LDV profiling goes on, we need to linearly scan
3881     // small object pool, where raise_closure is stored, so we should
3882     // use MIN_UPD_SIZE.
3883     //
3884     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3885     //                                 sizeofW(StgClosure)+1);
3886     //
3887
3888     //
3889     // Walk up the stack, looking for the catch frame.  On the way,
3890     // we update any closures pointed to from update frames with the
3891     // raise closure that we just built.
3892     //
3893     p = tso->sp;
3894     while(1) {
3895         info = get_ret_itbl((StgClosure *)p);
3896         next = p + stack_frame_sizeW((StgClosure *)p);
3897         switch (info->i.type) {
3898             
3899         case UPDATE_FRAME:
3900             // Only create raise_closure if we need to.
3901             if (raise_closure == NULL) {
3902                 raise_closure = 
3903                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3904                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3905                 raise_closure->payload[0] = exception;
3906             }
3907             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3908             p = next;
3909             continue;
3910
3911         case ATOMICALLY_FRAME:
3912             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3913             tso->sp = p;
3914             return ATOMICALLY_FRAME;
3915             
3916         case CATCH_FRAME:
3917             tso->sp = p;
3918             return CATCH_FRAME;
3919
3920         case CATCH_STM_FRAME:
3921             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3922             tso->sp = p;
3923             return CATCH_STM_FRAME;
3924             
3925         case STOP_FRAME:
3926             tso->sp = p;
3927             return STOP_FRAME;
3928
3929         case CATCH_RETRY_FRAME:
3930         default:
3931             p = next; 
3932             continue;
3933         }
3934     }
3935 }
3936
3937
3938 /* -----------------------------------------------------------------------------
3939    findRetryFrameHelper
3940
3941    This function is called by the retry# primitive.  It traverses the stack
3942    leaving tso->sp referring to the frame which should handle the retry.  
3943
3944    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3945    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3946
3947    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3948    despite the similar implementation.
3949
3950    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3951    not be created within memory transactions.
3952    -------------------------------------------------------------------------- */
3953
3954 StgWord
3955 findRetryFrameHelper (StgTSO *tso)
3956 {
3957   StgPtr           p, next;
3958   StgRetInfoTable *info;
3959
3960   p = tso -> sp;
3961   while (1) {
3962     info = get_ret_itbl((StgClosure *)p);
3963     next = p + stack_frame_sizeW((StgClosure *)p);
3964     switch (info->i.type) {
3965       
3966     case ATOMICALLY_FRAME:
3967       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3968       tso->sp = p;
3969       return ATOMICALLY_FRAME;
3970       
3971     case CATCH_RETRY_FRAME:
3972       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3973       tso->sp = p;
3974       return CATCH_RETRY_FRAME;
3975       
3976     case CATCH_STM_FRAME:
3977     default:
3978       ASSERT(info->i.type != CATCH_FRAME);
3979       ASSERT(info->i.type != STOP_FRAME);
3980       p = next; 
3981       continue;
3982     }
3983   }
3984 }
3985
3986 /* -----------------------------------------------------------------------------
3987    resurrectThreads is called after garbage collection on the list of
3988    threads found to be garbage.  Each of these threads will be woken
3989    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3990    on an MVar, or NonTermination if the thread was blocked on a Black
3991    Hole.
3992
3993    Locks: assumes we hold *all* the capabilities.
3994    -------------------------------------------------------------------------- */
3995
3996 void
3997 resurrectThreads (StgTSO *threads)
3998 {
3999     StgTSO *tso, *next;
4000     Capability *cap;
4001
4002     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4003         next = tso->global_link;
4004         tso->global_link = all_threads;
4005         all_threads = tso;
4006         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4007         
4008         // Wake up the thread on the Capability it was last on for a
4009         // bound thread, or last_free_capability otherwise.
4010         if (tso->bound) {
4011             cap = tso->bound->cap;
4012         } else {
4013             cap = last_free_capability;
4014         }
4015         
4016         switch (tso->why_blocked) {
4017         case BlockedOnMVar:
4018         case BlockedOnException:
4019             /* Called by GC - sched_mutex lock is currently held. */
4020             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4021             break;
4022         case BlockedOnBlackHole:
4023             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4024             break;
4025         case BlockedOnSTM:
4026             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4027             break;
4028         case NotBlocked:
4029             /* This might happen if the thread was blocked on a black hole
4030              * belonging to a thread that we've just woken up (raiseAsync
4031              * can wake up threads, remember...).
4032              */
4033             continue;
4034         default:
4035             barf("resurrectThreads: thread blocked in a strange way");
4036         }
4037     }
4038 }
4039
4040 /* ----------------------------------------------------------------------------
4041  * Debugging: why is a thread blocked
4042  * [Also provides useful information when debugging threaded programs
4043  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4044    ------------------------------------------------------------------------- */
4045
4046 #if DEBUG
4047 static void
4048 printThreadBlockage(StgTSO *tso)
4049 {
4050   switch (tso->why_blocked) {
4051   case BlockedOnRead:
4052     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4053     break;
4054   case BlockedOnWrite:
4055     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4056     break;
4057 #if defined(mingw32_HOST_OS)
4058     case BlockedOnDoProc:
4059     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4060     break;
4061 #endif
4062   case BlockedOnDelay:
4063     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4064     break;
4065   case BlockedOnMVar:
4066     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4067     break;
4068   case BlockedOnException:
4069     debugBelch("is blocked on delivering an exception to thread %d",
4070             tso->block_info.tso->id);
4071     break;
4072   case BlockedOnBlackHole:
4073     debugBelch("is blocked on a black hole");
4074     break;
4075   case NotBlocked:
4076     debugBelch("is not blocked");
4077     break;
4078 #if defined(PARALLEL_HASKELL)
4079   case BlockedOnGA:
4080     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4081             tso->block_info.closure, info_type(tso->block_info.closure));
4082     break;
4083   case BlockedOnGA_NoSend:
4084     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4085             tso->block_info.closure, info_type(tso->block_info.closure));
4086     break;
4087 #endif
4088   case BlockedOnCCall:
4089     debugBelch("is blocked on an external call");
4090     break;
4091   case BlockedOnCCall_NoUnblockExc:
4092     debugBelch("is blocked on an external call (exceptions were already blocked)");
4093     break;
4094   case BlockedOnSTM:
4095     debugBelch("is blocked on an STM operation");
4096     break;
4097   default:
4098     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4099          tso->why_blocked, tso->id, tso);
4100   }
4101 }
4102
4103 void
4104 printThreadStatus(StgTSO *t)
4105 {
4106     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4107     {
4108       void *label = lookupThreadLabel(t->id);
4109       if (label) debugBelch("[\"%s\"] ",(char *)label);
4110     }
4111     if (t->what_next == ThreadRelocated) {
4112         debugBelch("has been relocated...\n");
4113     } else {
4114         switch (t->what_next) {
4115         case ThreadKilled:
4116             debugBelch("has been killed");
4117             break;
4118         case ThreadComplete:
4119             debugBelch("has completed");
4120             break;
4121         default:
4122             printThreadBlockage(t);
4123         }
4124         debugBelch("\n");
4125     }
4126 }
4127
4128 void
4129 printAllThreads(void)
4130 {
4131   StgTSO *t, *next;
4132   nat i;
4133   Capability *cap;
4134
4135 # if defined(GRAN)
4136   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4137   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4138                        time_string, rtsFalse/*no commas!*/);
4139
4140   debugBelch("all threads at [%s]:\n", time_string);
4141 # elif defined(PARALLEL_HASKELL)
4142   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4143   ullong_format_string(CURRENT_TIME,
4144                        time_string, rtsFalse/*no commas!*/);
4145
4146   debugBelch("all threads at [%s]:\n", time_string);
4147 # else
4148   debugBelch("all threads:\n");
4149 # endif
4150
4151   for (i = 0; i < n_capabilities; i++) {
4152       cap = &capabilities[i];
4153       debugBelch("threads on capability %d:\n", cap->no);
4154       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4155           printThreadStatus(t);
4156       }
4157   }
4158
4159   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4160       if (t->why_blocked != NotBlocked) {
4161           printThreadStatus(t);
4162       }
4163       if (t->what_next == ThreadRelocated) {
4164           next = t->link;
4165       } else {
4166           next = t->global_link;
4167       }
4168   }
4169 }
4170
4171 // useful from gdb
4172 void 
4173 printThreadQueue(StgTSO *t)
4174 {
4175     nat i = 0;
4176     for (; t != END_TSO_QUEUE; t = t->link) {
4177         printThreadStatus(t);
4178         i++;
4179     }
4180     debugBelch("%d threads on queue\n", i);
4181 }
4182
4183 /* 
4184    Print a whole blocking queue attached to node (debugging only).
4185 */
4186 # if defined(PARALLEL_HASKELL)
4187 void 
4188 print_bq (StgClosure *node)
4189 {
4190   StgBlockingQueueElement *bqe;
4191   StgTSO *tso;
4192   rtsBool end;
4193
4194   debugBelch("## BQ of closure %p (%s): ",
4195           node, info_type(node));
4196
4197   /* should cover all closures that may have a blocking queue */
4198   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4199          get_itbl(node)->type == FETCH_ME_BQ ||
4200          get_itbl(node)->type == RBH ||
4201          get_itbl(node)->type == MVAR);
4202     
4203   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4204
4205   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4206 }
4207
4208 /* 
4209    Print a whole blocking queue starting with the element bqe.
4210 */
4211 void 
4212 print_bqe (StgBlockingQueueElement *bqe)
4213 {
4214   rtsBool end;
4215
4216   /* 
4217      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4218   */
4219   for (end = (bqe==END_BQ_QUEUE);
4220        !end; // iterate until bqe points to a CONSTR
4221        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4222        bqe = end ? END_BQ_QUEUE : bqe->link) {
4223     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4224     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4225     /* types of closures that may appear in a blocking queue */
4226     ASSERT(get_itbl(bqe)->type == TSO ||           
4227            get_itbl(bqe)->type == BLOCKED_FETCH || 
4228            get_itbl(bqe)->type == CONSTR); 
4229     /* only BQs of an RBH end with an RBH_Save closure */
4230     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4231
4232     switch (get_itbl(bqe)->type) {
4233     case TSO:
4234       debugBelch(" TSO %u (%x),",
4235               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4236       break;
4237     case BLOCKED_FETCH:
4238       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4239               ((StgBlockedFetch *)bqe)->node, 
4240               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4241               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4242               ((StgBlockedFetch *)bqe)->ga.weight);
4243       break;
4244     case CONSTR:
4245       debugBelch(" %s (IP %p),",
4246               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4247                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4248                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4249                "RBH_Save_?"), get_itbl(bqe));
4250       break;
4251     default:
4252       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4253            info_type((StgClosure *)bqe)); // , node, info_type(node));
4254       break;
4255     }
4256   } /* for */
4257   debugBelch("\n");
4258 }
4259 # elif defined(GRAN)
4260 void 
4261 print_bq (StgClosure *node)
4262 {
4263   StgBlockingQueueElement *bqe;
4264   PEs node_loc, tso_loc;
4265   rtsBool end;
4266
4267   /* should cover all closures that may have a blocking queue */
4268   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4269          get_itbl(node)->type == FETCH_ME_BQ ||
4270          get_itbl(node)->type == RBH);
4271     
4272   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4273   node_loc = where_is(node);
4274
4275   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4276           node, info_type(node), node_loc);
4277
4278   /* 
4279      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4280   */
4281   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4282        !end; // iterate until bqe points to a CONSTR
4283        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4284     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4285     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4286     /* types of closures that may appear in a blocking queue */
4287     ASSERT(get_itbl(bqe)->type == TSO ||           
4288            get_itbl(bqe)->type == CONSTR); 
4289     /* only BQs of an RBH end with an RBH_Save closure */
4290     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4291
4292     tso_loc = where_is((StgClosure *)bqe);
4293     switch (get_itbl(bqe)->type) {
4294     case TSO:
4295       debugBelch(" TSO %d (%p) on [PE %d],",
4296               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4297       break;
4298     case CONSTR:
4299       debugBelch(" %s (IP %p),",
4300               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4301                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4302                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4303                "RBH_Save_?"), get_itbl(bqe));
4304       break;
4305     default:
4306       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4307            info_type((StgClosure *)bqe), node, info_type(node));
4308       break;
4309     }
4310   } /* for */
4311   debugBelch("\n");
4312 }
4313 # endif
4314
4315 #if defined(PARALLEL_HASKELL)
4316 static nat
4317 run_queue_len(void)
4318 {
4319     nat i;
4320     StgTSO *tso;
4321     
4322     for (i=0, tso=run_queue_hd; 
4323          tso != END_TSO_QUEUE;
4324          i++, tso=tso->link) {
4325         /* nothing */
4326     }
4327         
4328     return i;
4329 }
4330 #endif
4331
4332 void
4333 sched_belch(char *s, ...)
4334 {
4335     va_list ap;
4336     va_start(ap,s);
4337 #ifdef THREADED_RTS
4338     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4339 #elif defined(PARALLEL_HASKELL)
4340     debugBelch("== ");
4341 #else
4342     debugBelch("sched: ");
4343 #endif
4344     vdebugBelch(s, ap);
4345     debugBelch("\n");
4346     va_end(ap);
4347 }
4348
4349 #endif /* DEBUG */