[project @ 2005-11-08 10:44:22 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 #ifdef THREADED_RTS
76 #define USED_WHEN_THREADED_RTS
77 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
78 #else
79 #define USED_WHEN_THREADED_RTS     STG_UNUSED
80 #define USED_WHEN_NON_THREADED_RTS
81 #endif
82
83 #ifdef SMP
84 #define USED_WHEN_SMP
85 #else
86 #define USED_WHEN_SMP STG_UNUSED
87 #endif
88
89 /* -----------------------------------------------------------------------------
90  * Global variables
91  * -------------------------------------------------------------------------- */
92
93 #if defined(GRAN)
94
95 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
96 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
97
98 /* 
99    In GranSim we have a runnable and a blocked queue for each processor.
100    In order to minimise code changes new arrays run_queue_hds/tls
101    are created. run_queue_hd is then a short cut (macro) for
102    run_queue_hds[CurrentProc] (see GranSim.h).
103    -- HWL
104 */
105 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
106 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
107 StgTSO *ccalling_threadss[MAX_PROC];
108 /* We use the same global list of threads (all_threads) in GranSim as in
109    the std RTS (i.e. we are cheating). However, we don't use this list in
110    the GranSim specific code at the moment (so we are only potentially
111    cheating).  */
112
113 #else /* !GRAN */
114
115 #if !defined(THREADED_RTS)
116 // Blocked/sleeping thrads
117 StgTSO *blocked_queue_hd = NULL;
118 StgTSO *blocked_queue_tl = NULL;
119 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
120 #endif
121
122 /* Threads blocked on blackholes.
123  * LOCK: sched_mutex+capability, or all capabilities
124  */
125 StgTSO *blackhole_queue = NULL;
126 #endif
127
128 /* The blackhole_queue should be checked for threads to wake up.  See
129  * Schedule.h for more thorough comment.
130  * LOCK: none (doesn't matter if we miss an update)
131  */
132 rtsBool blackholes_need_checking = rtsFalse;
133
134 /* Linked list of all threads.
135  * Used for detecting garbage collected threads.
136  * LOCK: sched_mutex+capability, or all capabilities
137  */
138 StgTSO *all_threads = NULL;
139
140 /* flag set by signal handler to precipitate a context switch
141  * LOCK: none (just an advisory flag)
142  */
143 int context_switch = 0;
144
145 /* flag that tracks whether we have done any execution in this time slice.
146  * LOCK: currently none, perhaps we should lock (but needs to be
147  * updated in the fast path of the scheduler).
148  */
149 nat recent_activity = ACTIVITY_YES;
150
151 /* if this flag is set as well, give up execution
152  * LOCK: none (changes once, from false->true)
153  */
154 rtsBool interrupted = rtsFalse;
155
156 /* Next thread ID to allocate.
157  * LOCK: sched_mutex
158  */
159 static StgThreadID next_thread_id = 1;
160
161 /* The smallest stack size that makes any sense is:
162  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
163  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
164  *  + 1                       (the closure to enter)
165  *  + 1                       (stg_ap_v_ret)
166  *  + 1                       (spare slot req'd by stg_ap_v_ret)
167  *
168  * A thread with this stack will bomb immediately with a stack
169  * overflow, which will increase its stack size.  
170  */
171 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
172
173 #if defined(GRAN)
174 StgTSO *CurrentTSO;
175 #endif
176
177 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
178  *  exists - earlier gccs apparently didn't.
179  *  -= chak
180  */
181 StgTSO dummy_tso;
182
183 /*
184  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
185  * in an MT setting, needed to signal that a worker thread shouldn't hang around
186  * in the scheduler when it is out of work.
187  */
188 rtsBool shutting_down_scheduler = rtsFalse;
189
190 /*
191  * This mutex protects most of the global scheduler data in
192  * the THREADED_RTS and (inc. SMP) runtime.
193  */
194 #if defined(THREADED_RTS)
195 Mutex sched_mutex;
196 #endif
197
198 #if defined(PARALLEL_HASKELL)
199 StgTSO *LastTSO;
200 rtsTime TimeOfLastYield;
201 rtsBool emitSchedule = rtsTrue;
202 #endif
203
204 /* -----------------------------------------------------------------------------
205  * static function prototypes
206  * -------------------------------------------------------------------------- */
207
208 static Capability *schedule (Capability *initialCapability, Task *task);
209
210 //
211 // These function all encapsulate parts of the scheduler loop, and are
212 // abstracted only to make the structure and control flow of the
213 // scheduler clearer.
214 //
215 static void schedulePreLoop (void);
216 #if defined(SMP)
217 static void schedulePushWork(Capability *cap, Task *task);
218 #endif
219 static void scheduleStartSignalHandlers (Capability *cap);
220 static void scheduleCheckBlockedThreads (Capability *cap);
221 static void scheduleCheckBlackHoles (Capability *cap);
222 static void scheduleDetectDeadlock (Capability *cap, Task *task);
223 #if defined(GRAN)
224 static StgTSO *scheduleProcessEvent(rtsEvent *event);
225 #endif
226 #if defined(PARALLEL_HASKELL)
227 static StgTSO *scheduleSendPendingMessages(void);
228 static void scheduleActivateSpark(void);
229 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
230 #endif
231 #if defined(PAR) || defined(GRAN)
232 static void scheduleGranParReport(void);
233 #endif
234 static void schedulePostRunThread(void);
235 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
236 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
237                                          StgTSO *t);
238 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
239                                     nat prev_what_next );
240 static void scheduleHandleThreadBlocked( StgTSO *t );
241 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
242                                              StgTSO *t );
243 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
244 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
245
246 static void unblockThread(Capability *cap, StgTSO *tso);
247 static rtsBool checkBlackHoles(Capability *cap);
248 static void AllRoots(evac_fn evac);
249
250 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
251
252 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
253                         rtsBool stop_at_atomically);
254
255 static void deleteThread (Capability *cap, StgTSO *tso);
256 static void deleteRunQueue (Capability *cap);
257
258 #ifdef DEBUG
259 static void printThreadBlockage(StgTSO *tso);
260 static void printThreadStatus(StgTSO *tso);
261 void printThreadQueue(StgTSO *tso);
262 #endif
263
264 #if defined(PARALLEL_HASKELL)
265 StgTSO * createSparkThread(rtsSpark spark);
266 StgTSO * activateSpark (rtsSpark spark);  
267 #endif
268
269 #ifdef DEBUG
270 static char *whatNext_strs[] = {
271   "(unknown)",
272   "ThreadRunGHC",
273   "ThreadInterpret",
274   "ThreadKilled",
275   "ThreadRelocated",
276   "ThreadComplete"
277 };
278 #endif
279
280 /* -----------------------------------------------------------------------------
281  * Putting a thread on the run queue: different scheduling policies
282  * -------------------------------------------------------------------------- */
283
284 STATIC_INLINE void
285 addToRunQueue( Capability *cap, StgTSO *t )
286 {
287 #if defined(PARALLEL_HASKELL)
288     if (RtsFlags.ParFlags.doFairScheduling) { 
289         // this does round-robin scheduling; good for concurrency
290         appendToRunQueue(cap,t);
291     } else {
292         // this does unfair scheduling; good for parallelism
293         pushOnRunQueue(cap,t);
294     }
295 #else
296     // this does round-robin scheduling; good for concurrency
297     appendToRunQueue(cap,t);
298 #endif
299 }
300
301 /* ---------------------------------------------------------------------------
302    Main scheduling loop.
303
304    We use round-robin scheduling, each thread returning to the
305    scheduler loop when one of these conditions is detected:
306
307       * out of heap space
308       * timer expires (thread yields)
309       * thread blocks
310       * thread ends
311       * stack overflow
312
313    GRAN version:
314      In a GranSim setup this loop iterates over the global event queue.
315      This revolves around the global event queue, which determines what 
316      to do next. Therefore, it's more complicated than either the 
317      concurrent or the parallel (GUM) setup.
318
319    GUM version:
320      GUM iterates over incoming messages.
321      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
322      and sends out a fish whenever it has nothing to do; in-between
323      doing the actual reductions (shared code below) it processes the
324      incoming messages and deals with delayed operations 
325      (see PendingFetches).
326      This is not the ugliest code you could imagine, but it's bloody close.
327
328    ------------------------------------------------------------------------ */
329
330 static Capability *
331 schedule (Capability *initialCapability, Task *task)
332 {
333   StgTSO *t;
334   Capability *cap;
335   StgThreadReturnCode ret;
336 #if defined(GRAN)
337   rtsEvent *event;
338 #elif defined(PARALLEL_HASKELL)
339   StgTSO *tso;
340   GlobalTaskId pe;
341   rtsBool receivedFinish = rtsFalse;
342 # if defined(DEBUG)
343   nat tp_size, sp_size; // stats only
344 # endif
345 #endif
346   nat prev_what_next;
347   rtsBool ready_to_gc;
348 #if defined(THREADED_RTS)
349   rtsBool first = rtsTrue;
350 #endif
351   
352   cap = initialCapability;
353
354   // Pre-condition: this task owns initialCapability.
355   // The sched_mutex is *NOT* held
356   // NB. on return, we still hold a capability.
357
358   IF_DEBUG(scheduler,
359            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
360                        task, initialCapability);
361       );
362
363   schedulePreLoop();
364
365   // -----------------------------------------------------------
366   // Scheduler loop starts here:
367
368 #if defined(PARALLEL_HASKELL)
369 #define TERMINATION_CONDITION        (!receivedFinish)
370 #elif defined(GRAN)
371 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
372 #else
373 #define TERMINATION_CONDITION        rtsTrue
374 #endif
375
376   while (TERMINATION_CONDITION) {
377
378 #if defined(GRAN)
379       /* Choose the processor with the next event */
380       CurrentProc = event->proc;
381       CurrentTSO = event->tso;
382 #endif
383
384 #if defined(THREADED_RTS)
385       if (first) {
386           // don't yield the first time, we want a chance to run this
387           // thread for a bit, even if there are others banging at the
388           // door.
389           first = rtsFalse;
390           ASSERT_CAPABILITY_INVARIANTS(cap,task);
391       } else {
392           // Yield the capability to higher-priority tasks if necessary.
393           yieldCapability(&cap, task);
394       }
395 #endif
396       
397 #ifdef SMP
398       schedulePushWork(cap,task);
399 #endif          
400
401     // Check whether we have re-entered the RTS from Haskell without
402     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
403     // call).
404     if (cap->in_haskell) {
405           errorBelch("schedule: re-entered unsafely.\n"
406                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
407           stg_exit(EXIT_FAILURE);
408     }
409
410     //
411     // Test for interruption.  If interrupted==rtsTrue, then either
412     // we received a keyboard interrupt (^C), or the scheduler is
413     // trying to shut down all the tasks (shutting_down_scheduler) in
414     // the threaded RTS.
415     //
416     if (interrupted) {
417         deleteRunQueue(cap);
418         if (shutting_down_scheduler) {
419             IF_DEBUG(scheduler, sched_belch("shutting down"));
420             // If we are a worker, just exit.  If we're a bound thread
421             // then we will exit below when we've removed our TSO from
422             // the run queue.
423             if (task->tso == NULL && emptyRunQueue(cap)) {
424                 return cap;
425             }
426         } else {
427             IF_DEBUG(scheduler, sched_belch("interrupted"));
428         }
429     }
430
431 #if defined(not_yet) && defined(SMP)
432     //
433     // Top up the run queue from our spark pool.  We try to make the
434     // number of threads in the run queue equal to the number of
435     // free capabilities.
436     //
437     {
438         StgClosure *spark;
439         if (emptyRunQueue()) {
440             spark = findSpark(rtsFalse);
441             if (spark == NULL) {
442                 break; /* no more sparks in the pool */
443             } else {
444                 createSparkThread(spark);         
445                 IF_DEBUG(scheduler,
446                          sched_belch("==^^ turning spark of closure %p into a thread",
447                                      (StgClosure *)spark));
448             }
449         }
450     }
451 #endif // SMP
452
453     scheduleStartSignalHandlers(cap);
454
455     // Only check the black holes here if we've nothing else to do.
456     // During normal execution, the black hole list only gets checked
457     // at GC time, to avoid repeatedly traversing this possibly long
458     // list each time around the scheduler.
459     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
460
461     scheduleCheckBlockedThreads(cap);
462
463     scheduleDetectDeadlock(cap,task);
464
465     // Normally, the only way we can get here with no threads to
466     // run is if a keyboard interrupt received during 
467     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
468     // Additionally, it is not fatal for the
469     // threaded RTS to reach here with no threads to run.
470     //
471     // win32: might be here due to awaitEvent() being abandoned
472     // as a result of a console event having been delivered.
473     if ( emptyRunQueue(cap) ) {
474 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
475         ASSERT(interrupted);
476 #endif
477         continue; // nothing to do
478     }
479
480 #if defined(PARALLEL_HASKELL)
481     scheduleSendPendingMessages();
482     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
483         continue;
484
485 #if defined(SPARKS)
486     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
487 #endif
488
489     /* If we still have no work we need to send a FISH to get a spark
490        from another PE */
491     if (emptyRunQueue(cap)) {
492         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
493         ASSERT(rtsFalse); // should not happen at the moment
494     }
495     // from here: non-empty run queue.
496     //  TODO: merge above case with this, only one call processMessages() !
497     if (PacketsWaiting()) {  /* process incoming messages, if
498                                 any pending...  only in else
499                                 because getRemoteWork waits for
500                                 messages as well */
501         receivedFinish = processMessages();
502     }
503 #endif
504
505 #if defined(GRAN)
506     scheduleProcessEvent(event);
507 #endif
508
509     // 
510     // Get a thread to run
511     //
512     t = popRunQueue(cap);
513
514 #if defined(GRAN) || defined(PAR)
515     scheduleGranParReport(); // some kind of debuging output
516 #else
517     // Sanity check the thread we're about to run.  This can be
518     // expensive if there is lots of thread switching going on...
519     IF_DEBUG(sanity,checkTSO(t));
520 #endif
521
522 #if defined(THREADED_RTS)
523     // Check whether we can run this thread in the current task.
524     // If not, we have to pass our capability to the right task.
525     {
526         Task *bound = t->bound;
527       
528         if (bound) {
529             if (bound == task) {
530                 IF_DEBUG(scheduler,
531                          sched_belch("### Running thread %d in bound thread",
532                                      t->id));
533                 // yes, the Haskell thread is bound to the current native thread
534             } else {
535                 IF_DEBUG(scheduler,
536                          sched_belch("### thread %d bound to another OS thread",
537                                      t->id));
538                 // no, bound to a different Haskell thread: pass to that thread
539                 pushOnRunQueue(cap,t);
540                 continue;
541             }
542         } else {
543             // The thread we want to run is unbound.
544             if (task->tso) { 
545                 IF_DEBUG(scheduler,
546                          sched_belch("### this OS thread cannot run thread %d", t->id));
547                 // no, the current native thread is bound to a different
548                 // Haskell thread, so pass it to any worker thread
549                 pushOnRunQueue(cap,t);
550                 continue; 
551             }
552         }
553     }
554 #endif
555
556     cap->r.rCurrentTSO = t;
557     
558     /* context switches are initiated by the timer signal, unless
559      * the user specified "context switch as often as possible", with
560      * +RTS -C0
561      */
562     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
563         && !emptyThreadQueues(cap)) {
564         context_switch = 1;
565     }
566          
567 run_thread:
568
569     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
570                               (long)t->id, whatNext_strs[t->what_next]));
571
572 #if defined(PROFILING)
573     startHeapProfTimer();
574 #endif
575
576     // ----------------------------------------------------------------------
577     // Run the current thread 
578
579     prev_what_next = t->what_next;
580
581     errno = t->saved_errno;
582     cap->in_haskell = rtsTrue;
583
584     recent_activity = ACTIVITY_YES;
585
586     switch (prev_what_next) {
587         
588     case ThreadKilled:
589     case ThreadComplete:
590         /* Thread already finished, return to scheduler. */
591         ret = ThreadFinished;
592         break;
593         
594     case ThreadRunGHC:
595     {
596         StgRegTable *r;
597         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
598         cap = regTableToCapability(r);
599         ret = r->rRet;
600         break;
601     }
602     
603     case ThreadInterpret:
604         cap = interpretBCO(cap);
605         ret = cap->r.rRet;
606         break;
607         
608     default:
609         barf("schedule: invalid what_next field");
610     }
611
612     cap->in_haskell = rtsFalse;
613
614     // The TSO might have moved, eg. if it re-entered the RTS and a GC
615     // happened.  So find the new location:
616     t = cap->r.rCurrentTSO;
617
618 #ifdef SMP
619     // If ret is ThreadBlocked, and this Task is bound to the TSO that
620     // blocked, we are in limbo - the TSO is now owned by whatever it
621     // is blocked on, and may in fact already have been woken up,
622     // perhaps even on a different Capability.  It may be the case
623     // that task->cap != cap.  We better yield this Capability
624     // immediately and return to normaility.
625     if (ret == ThreadBlocked) {
626         IF_DEBUG(scheduler,
627                  debugBelch("--<< thread %d (%s) stopped: blocked\n",
628                             t->id, whatNext_strs[t->what_next]));
629         continue;
630     }
631 #endif
632
633     ASSERT_CAPABILITY_INVARIANTS(cap,task);
634
635     // And save the current errno in this thread.
636     t->saved_errno = errno;
637
638     // ----------------------------------------------------------------------
639     
640     // Costs for the scheduler are assigned to CCS_SYSTEM
641 #if defined(PROFILING)
642     stopHeapProfTimer();
643     CCCS = CCS_SYSTEM;
644 #endif
645     
646     // We have run some Haskell code: there might be blackhole-blocked
647     // threads to wake up now.
648     // Lock-free test here should be ok, we're just setting a flag.
649     if ( blackhole_queue != END_TSO_QUEUE ) {
650         blackholes_need_checking = rtsTrue;
651     }
652     
653 #if defined(THREADED_RTS)
654     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
655 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
656     IF_DEBUG(scheduler,debugBelch("sched: "););
657 #endif
658     
659     schedulePostRunThread();
660
661     ready_to_gc = rtsFalse;
662
663     switch (ret) {
664     case HeapOverflow:
665         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
666         break;
667
668     case StackOverflow:
669         scheduleHandleStackOverflow(cap,task,t);
670         break;
671
672     case ThreadYielding:
673         if (scheduleHandleYield(cap, t, prev_what_next)) {
674             // shortcut for switching between compiler/interpreter:
675             goto run_thread; 
676         }
677         break;
678
679     case ThreadBlocked:
680         scheduleHandleThreadBlocked(t);
681         break;
682
683     case ThreadFinished:
684         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
685         ASSERT_CAPABILITY_INVARIANTS(cap,task);
686         break;
687
688     default:
689       barf("schedule: invalid thread return code %d", (int)ret);
690     }
691
692     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
693     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
694   } /* end of while() */
695
696   IF_PAR_DEBUG(verbose,
697                debugBelch("== Leaving schedule() after having received Finish\n"));
698 }
699
700 /* ----------------------------------------------------------------------------
701  * Setting up the scheduler loop
702  * ------------------------------------------------------------------------- */
703
704 static void
705 schedulePreLoop(void)
706 {
707 #if defined(GRAN) 
708     /* set up first event to get things going */
709     /* ToDo: assign costs for system setup and init MainTSO ! */
710     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
711               ContinueThread, 
712               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
713     
714     IF_DEBUG(gran,
715              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
716                         CurrentTSO);
717              G_TSO(CurrentTSO, 5));
718     
719     if (RtsFlags.GranFlags.Light) {
720         /* Save current time; GranSim Light only */
721         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
722     }      
723 #endif
724 }
725
726 /* -----------------------------------------------------------------------------
727  * schedulePushWork()
728  *
729  * Push work to other Capabilities if we have some.
730  * -------------------------------------------------------------------------- */
731
732 #ifdef SMP
733 static void
734 schedulePushWork(Capability *cap USED_WHEN_SMP, 
735                  Task *task      USED_WHEN_SMP)
736 {
737     Capability *free_caps[n_capabilities], *cap0;
738     nat i, n_free_caps;
739
740     // Check whether we have more threads on our run queue that we
741     // could hand to another Capability.
742     if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
743         return;
744     }
745
746     // First grab as many free Capabilities as we can.
747     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
748         cap0 = &capabilities[i];
749         if (cap != cap0 && tryGrabCapability(cap0,task)) {
750             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
751                 // it already has some work, we just grabbed it at 
752                 // the wrong moment.  Or maybe it's deadlocked!
753                 releaseCapability(cap0);
754             } else {
755                 free_caps[n_free_caps++] = cap0;
756             }
757         }
758     }
759
760     // we now have n_free_caps free capabilities stashed in
761     // free_caps[].  Share our run queue equally with them.  This is
762     // probably the simplest thing we could do; improvements we might
763     // want to do include:
764     //
765     //   - giving high priority to moving relatively new threads, on 
766     //     the gournds that they haven't had time to build up a
767     //     working set in the cache on this CPU/Capability.
768     //
769     //   - giving low priority to moving long-lived threads
770
771     if (n_free_caps > 0) {
772         StgTSO *prev, *t, *next;
773         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
774
775         prev = cap->run_queue_hd;
776         t = prev->link;
777         prev->link = END_TSO_QUEUE;
778         i = 0;
779         for (; t != END_TSO_QUEUE; t = next) {
780             next = t->link;
781             t->link = END_TSO_QUEUE;
782             if (t->what_next == ThreadRelocated
783                 || t->bound == task) { // don't move my bound thread
784                 prev->link = t;
785                 prev = t;
786             } else if (i == n_free_caps) {
787                 i = 0;
788                 // keep one for us
789                 prev->link = t;
790                 prev = t;
791             } else {
792                 appendToRunQueue(free_caps[i],t);
793                 if (t->bound) { t->bound->cap = free_caps[i]; }
794                 i++;
795             }
796         }
797         cap->run_queue_tl = prev;
798
799         // release the capabilities
800         for (i = 0; i < n_free_caps; i++) {
801             task->cap = free_caps[i];
802             releaseCapability(free_caps[i]);
803         }
804     }
805     task->cap = cap; // reset to point to our Capability.
806 }
807 #endif
808
809 /* ----------------------------------------------------------------------------
810  * Start any pending signal handlers
811  * ------------------------------------------------------------------------- */
812
813 static void
814 scheduleStartSignalHandlers(Capability *cap)
815 {
816 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
817     if (signals_pending()) { // safe outside the lock
818         startSignalHandlers(cap);
819     }
820 #endif
821 }
822
823 /* ----------------------------------------------------------------------------
824  * Check for blocked threads that can be woken up.
825  * ------------------------------------------------------------------------- */
826
827 static void
828 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
829 {
830 #if !defined(THREADED_RTS)
831     //
832     // Check whether any waiting threads need to be woken up.  If the
833     // run queue is empty, and there are no other tasks running, we
834     // can wait indefinitely for something to happen.
835     //
836     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
837     {
838         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
839     }
840 #endif
841 }
842
843
844 /* ----------------------------------------------------------------------------
845  * Check for threads blocked on BLACKHOLEs that can be woken up
846  * ------------------------------------------------------------------------- */
847 static void
848 scheduleCheckBlackHoles (Capability *cap)
849 {
850     if ( blackholes_need_checking ) // check without the lock first
851     {
852         ACQUIRE_LOCK(&sched_mutex);
853         if ( blackholes_need_checking ) {
854             checkBlackHoles(cap);
855             blackholes_need_checking = rtsFalse;
856         }
857         RELEASE_LOCK(&sched_mutex);
858     }
859 }
860
861 /* ----------------------------------------------------------------------------
862  * Detect deadlock conditions and attempt to resolve them.
863  * ------------------------------------------------------------------------- */
864
865 static void
866 scheduleDetectDeadlock (Capability *cap, Task *task)
867 {
868
869 #if defined(PARALLEL_HASKELL)
870     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
871     return;
872 #endif
873
874     /* 
875      * Detect deadlock: when we have no threads to run, there are no
876      * threads blocked, waiting for I/O, or sleeping, and all the
877      * other tasks are waiting for work, we must have a deadlock of
878      * some description.
879      */
880     if ( emptyThreadQueues(cap) )
881     {
882 #if defined(THREADED_RTS)
883         /* 
884          * In the threaded RTS, we only check for deadlock if there
885          * has been no activity in a complete timeslice.  This means
886          * we won't eagerly start a full GC just because we don't have
887          * any threads to run currently.
888          */
889         if (recent_activity != ACTIVITY_INACTIVE) return;
890 #endif
891
892         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
893
894         // Garbage collection can release some new threads due to
895         // either (a) finalizers or (b) threads resurrected because
896         // they are unreachable and will therefore be sent an
897         // exception.  Any threads thus released will be immediately
898         // runnable.
899         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
900         recent_activity = ACTIVITY_DONE_GC;
901         
902         if ( !emptyRunQueue(cap) ) return;
903
904 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
905         /* If we have user-installed signal handlers, then wait
906          * for signals to arrive rather then bombing out with a
907          * deadlock.
908          */
909         if ( anyUserHandlers() ) {
910             IF_DEBUG(scheduler, 
911                      sched_belch("still deadlocked, waiting for signals..."));
912
913             awaitUserSignals();
914
915             if (signals_pending()) {
916                 startSignalHandlers(cap);
917             }
918
919             // either we have threads to run, or we were interrupted:
920             ASSERT(!emptyRunQueue(cap) || interrupted);
921         }
922 #endif
923
924 #if !defined(THREADED_RTS)
925         /* Probably a real deadlock.  Send the current main thread the
926          * Deadlock exception.
927          */
928         if (task->tso) {
929             switch (task->tso->why_blocked) {
930             case BlockedOnSTM:
931             case BlockedOnBlackHole:
932             case BlockedOnException:
933             case BlockedOnMVar:
934                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
935                 return;
936             default:
937                 barf("deadlock: main thread blocked in a strange way");
938             }
939         }
940         return;
941 #endif
942     }
943 }
944
945 /* ----------------------------------------------------------------------------
946  * Process an event (GRAN only)
947  * ------------------------------------------------------------------------- */
948
949 #if defined(GRAN)
950 static StgTSO *
951 scheduleProcessEvent(rtsEvent *event)
952 {
953     StgTSO *t;
954
955     if (RtsFlags.GranFlags.Light)
956       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
957
958     /* adjust time based on time-stamp */
959     if (event->time > CurrentTime[CurrentProc] &&
960         event->evttype != ContinueThread)
961       CurrentTime[CurrentProc] = event->time;
962     
963     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
964     if (!RtsFlags.GranFlags.Light)
965       handleIdlePEs();
966
967     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
968
969     /* main event dispatcher in GranSim */
970     switch (event->evttype) {
971       /* Should just be continuing execution */
972     case ContinueThread:
973       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
974       /* ToDo: check assertion
975       ASSERT(run_queue_hd != (StgTSO*)NULL &&
976              run_queue_hd != END_TSO_QUEUE);
977       */
978       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
979       if (!RtsFlags.GranFlags.DoAsyncFetch &&
980           procStatus[CurrentProc]==Fetching) {
981         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
982               CurrentTSO->id, CurrentTSO, CurrentProc);
983         goto next_thread;
984       } 
985       /* Ignore ContinueThreads for completed threads */
986       if (CurrentTSO->what_next == ThreadComplete) {
987         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
988               CurrentTSO->id, CurrentTSO, CurrentProc);
989         goto next_thread;
990       } 
991       /* Ignore ContinueThreads for threads that are being migrated */
992       if (PROCS(CurrentTSO)==Nowhere) { 
993         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
994               CurrentTSO->id, CurrentTSO, CurrentProc);
995         goto next_thread;
996       }
997       /* The thread should be at the beginning of the run queue */
998       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
999         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1000               CurrentTSO->id, CurrentTSO, CurrentProc);
1001         break; // run the thread anyway
1002       }
1003       /*
1004       new_event(proc, proc, CurrentTime[proc],
1005                 FindWork,
1006                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1007       goto next_thread; 
1008       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1009       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1010
1011     case FetchNode:
1012       do_the_fetchnode(event);
1013       goto next_thread;             /* handle next event in event queue  */
1014       
1015     case GlobalBlock:
1016       do_the_globalblock(event);
1017       goto next_thread;             /* handle next event in event queue  */
1018       
1019     case FetchReply:
1020       do_the_fetchreply(event);
1021       goto next_thread;             /* handle next event in event queue  */
1022       
1023     case UnblockThread:   /* Move from the blocked queue to the tail of */
1024       do_the_unblock(event);
1025       goto next_thread;             /* handle next event in event queue  */
1026       
1027     case ResumeThread:  /* Move from the blocked queue to the tail of */
1028       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1029       event->tso->gran.blocktime += 
1030         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1031       do_the_startthread(event);
1032       goto next_thread;             /* handle next event in event queue  */
1033       
1034     case StartThread:
1035       do_the_startthread(event);
1036       goto next_thread;             /* handle next event in event queue  */
1037       
1038     case MoveThread:
1039       do_the_movethread(event);
1040       goto next_thread;             /* handle next event in event queue  */
1041       
1042     case MoveSpark:
1043       do_the_movespark(event);
1044       goto next_thread;             /* handle next event in event queue  */
1045       
1046     case FindWork:
1047       do_the_findwork(event);
1048       goto next_thread;             /* handle next event in event queue  */
1049       
1050     default:
1051       barf("Illegal event type %u\n", event->evttype);
1052     }  /* switch */
1053     
1054     /* This point was scheduler_loop in the old RTS */
1055
1056     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1057
1058     TimeOfLastEvent = CurrentTime[CurrentProc];
1059     TimeOfNextEvent = get_time_of_next_event();
1060     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1061     // CurrentTSO = ThreadQueueHd;
1062
1063     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1064                          TimeOfNextEvent));
1065
1066     if (RtsFlags.GranFlags.Light) 
1067       GranSimLight_leave_system(event, &ActiveTSO); 
1068
1069     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1070
1071     IF_DEBUG(gran, 
1072              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1073
1074     /* in a GranSim setup the TSO stays on the run queue */
1075     t = CurrentTSO;
1076     /* Take a thread from the run queue. */
1077     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1078
1079     IF_DEBUG(gran, 
1080              debugBelch("GRAN: About to run current thread, which is\n");
1081              G_TSO(t,5));
1082
1083     context_switch = 0; // turned on via GranYield, checking events and time slice
1084
1085     IF_DEBUG(gran, 
1086              DumpGranEvent(GR_SCHEDULE, t));
1087
1088     procStatus[CurrentProc] = Busy;
1089 }
1090 #endif // GRAN
1091
1092 /* ----------------------------------------------------------------------------
1093  * Send pending messages (PARALLEL_HASKELL only)
1094  * ------------------------------------------------------------------------- */
1095
1096 #if defined(PARALLEL_HASKELL)
1097 static StgTSO *
1098 scheduleSendPendingMessages(void)
1099 {
1100     StgSparkPool *pool;
1101     rtsSpark spark;
1102     StgTSO *t;
1103
1104 # if defined(PAR) // global Mem.Mgmt., omit for now
1105     if (PendingFetches != END_BF_QUEUE) {
1106         processFetches();
1107     }
1108 # endif
1109     
1110     if (RtsFlags.ParFlags.BufferTime) {
1111         // if we use message buffering, we must send away all message
1112         // packets which have become too old...
1113         sendOldBuffers(); 
1114     }
1115 }
1116 #endif
1117
1118 /* ----------------------------------------------------------------------------
1119  * Activate spark threads (PARALLEL_HASKELL only)
1120  * ------------------------------------------------------------------------- */
1121
1122 #if defined(PARALLEL_HASKELL)
1123 static void
1124 scheduleActivateSpark(void)
1125 {
1126 #if defined(SPARKS)
1127   ASSERT(emptyRunQueue());
1128 /* We get here if the run queue is empty and want some work.
1129    We try to turn a spark into a thread, and add it to the run queue,
1130    from where it will be picked up in the next iteration of the scheduler
1131    loop.
1132 */
1133
1134       /* :-[  no local threads => look out for local sparks */
1135       /* the spark pool for the current PE */
1136       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1137       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1138           pool->hd < pool->tl) {
1139         /* 
1140          * ToDo: add GC code check that we really have enough heap afterwards!!
1141          * Old comment:
1142          * If we're here (no runnable threads) and we have pending
1143          * sparks, we must have a space problem.  Get enough space
1144          * to turn one of those pending sparks into a
1145          * thread... 
1146          */
1147
1148         spark = findSpark(rtsFalse);            /* get a spark */
1149         if (spark != (rtsSpark) NULL) {
1150           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1151           IF_PAR_DEBUG(fish, // schedule,
1152                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1153                              tso->id, tso, advisory_thread_count));
1154
1155           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1156             IF_PAR_DEBUG(fish, // schedule,
1157                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1158                             spark));
1159             return rtsFalse; /* failed to generate a thread */
1160           }                  /* otherwise fall through & pick-up new tso */
1161         } else {
1162           IF_PAR_DEBUG(fish, // schedule,
1163                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1164                              spark_queue_len(pool)));
1165           return rtsFalse;  /* failed to generate a thread */
1166         }
1167         return rtsTrue;  /* success in generating a thread */
1168   } else { /* no more threads permitted or pool empty */
1169     return rtsFalse;  /* failed to generateThread */
1170   }
1171 #else
1172   tso = NULL; // avoid compiler warning only
1173   return rtsFalse;  /* dummy in non-PAR setup */
1174 #endif // SPARKS
1175 }
1176 #endif // PARALLEL_HASKELL
1177
1178 /* ----------------------------------------------------------------------------
1179  * Get work from a remote node (PARALLEL_HASKELL only)
1180  * ------------------------------------------------------------------------- */
1181     
1182 #if defined(PARALLEL_HASKELL)
1183 static rtsBool
1184 scheduleGetRemoteWork(rtsBool *receivedFinish)
1185 {
1186   ASSERT(emptyRunQueue());
1187
1188   if (RtsFlags.ParFlags.BufferTime) {
1189         IF_PAR_DEBUG(verbose, 
1190                 debugBelch("...send all pending data,"));
1191         {
1192           nat i;
1193           for (i=1; i<=nPEs; i++)
1194             sendImmediately(i); // send all messages away immediately
1195         }
1196   }
1197 # ifndef SPARKS
1198         //++EDEN++ idle() , i.e. send all buffers, wait for work
1199         // suppress fishing in EDEN... just look for incoming messages
1200         // (blocking receive)
1201   IF_PAR_DEBUG(verbose, 
1202                debugBelch("...wait for incoming messages...\n"));
1203   *receivedFinish = processMessages(); // blocking receive...
1204
1205         // and reenter scheduling loop after having received something
1206         // (return rtsFalse below)
1207
1208 # else /* activate SPARKS machinery */
1209 /* We get here, if we have no work, tried to activate a local spark, but still
1210    have no work. We try to get a remote spark, by sending a FISH message.
1211    Thread migration should be added here, and triggered when a sequence of 
1212    fishes returns without work. */
1213         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1214
1215       /* =8-[  no local sparks => look for work on other PEs */
1216         /*
1217          * We really have absolutely no work.  Send out a fish
1218          * (there may be some out there already), and wait for
1219          * something to arrive.  We clearly can't run any threads
1220          * until a SCHEDULE or RESUME arrives, and so that's what
1221          * we're hoping to see.  (Of course, we still have to
1222          * respond to other types of messages.)
1223          */
1224         rtsTime now = msTime() /*CURRENT_TIME*/;
1225         IF_PAR_DEBUG(verbose, 
1226                      debugBelch("--  now=%ld\n", now));
1227         IF_PAR_DEBUG(fish, // verbose,
1228              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1229                  (last_fish_arrived_at!=0 &&
1230                   last_fish_arrived_at+delay > now)) {
1231                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1232                      now, last_fish_arrived_at+delay, 
1233                      last_fish_arrived_at,
1234                      delay);
1235              });
1236   
1237         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1238             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1239           if (last_fish_arrived_at==0 ||
1240               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1241             /* outstandingFishes is set in sendFish, processFish;
1242                avoid flooding system with fishes via delay */
1243     next_fish_to_send_at = 0;  
1244   } else {
1245     /* ToDo: this should be done in the main scheduling loop to avoid the
1246              busy wait here; not so bad if fish delay is very small  */
1247     int iq = 0; // DEBUGGING -- HWL
1248     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1249     /* send a fish when ready, but process messages that arrive in the meantime */
1250     do {
1251       if (PacketsWaiting()) {
1252         iq++; // DEBUGGING
1253         *receivedFinish = processMessages();
1254       }
1255       now = msTime();
1256     } while (!*receivedFinish || now<next_fish_to_send_at);
1257     // JB: This means the fish could become obsolete, if we receive
1258     // work. Better check for work again? 
1259     // last line: while (!receivedFinish || !haveWork || now<...)
1260     // next line: if (receivedFinish || haveWork )
1261
1262     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1263       return rtsFalse;  // NB: this will leave scheduler loop
1264                         // immediately after return!
1265                           
1266     IF_PAR_DEBUG(fish, // verbose,
1267                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1268
1269   }
1270
1271     // JB: IMHO, this should all be hidden inside sendFish(...)
1272     /* pe = choosePE(); 
1273        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1274                 NEW_FISH_HUNGER);
1275
1276     // Global statistics: count no. of fishes
1277     if (RtsFlags.ParFlags.ParStats.Global &&
1278          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1279            globalParStats.tot_fish_mess++;
1280            }
1281     */ 
1282
1283   /* delayed fishes must have been sent by now! */
1284   next_fish_to_send_at = 0;  
1285   }
1286       
1287   *receivedFinish = processMessages();
1288 # endif /* SPARKS */
1289
1290  return rtsFalse;
1291  /* NB: this function always returns rtsFalse, meaning the scheduler
1292     loop continues with the next iteration; 
1293     rationale: 
1294       return code means success in finding work; we enter this function
1295       if there is no local work, thus have to send a fish which takes
1296       time until it arrives with work; in the meantime we should process
1297       messages in the main loop;
1298  */
1299 }
1300 #endif // PARALLEL_HASKELL
1301
1302 /* ----------------------------------------------------------------------------
1303  * PAR/GRAN: Report stats & debugging info(?)
1304  * ------------------------------------------------------------------------- */
1305
1306 #if defined(PAR) || defined(GRAN)
1307 static void
1308 scheduleGranParReport(void)
1309 {
1310   ASSERT(run_queue_hd != END_TSO_QUEUE);
1311
1312   /* Take a thread from the run queue, if we have work */
1313   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1314
1315     /* If this TSO has got its outport closed in the meantime, 
1316      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1317      * It has to be marked as TH_DEAD for this purpose.
1318      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1319
1320 JB: TODO: investigate wether state change field could be nuked
1321      entirely and replaced by the normal tso state (whatnext
1322      field). All we want to do is to kill tsos from outside.
1323      */
1324
1325     /* ToDo: write something to the log-file
1326     if (RTSflags.ParFlags.granSimStats && !sameThread)
1327         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1328
1329     CurrentTSO = t;
1330     */
1331     /* the spark pool for the current PE */
1332     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1333
1334     IF_DEBUG(scheduler, 
1335              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1336                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1337
1338     IF_PAR_DEBUG(fish,
1339              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1340                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1341
1342     if (RtsFlags.ParFlags.ParStats.Full && 
1343         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1344         (emitSchedule || // forced emit
1345          (t && LastTSO && t->id != LastTSO->id))) {
1346       /* 
1347          we are running a different TSO, so write a schedule event to log file
1348          NB: If we use fair scheduling we also have to write  a deschedule 
1349              event for LastTSO; with unfair scheduling we know that the
1350              previous tso has blocked whenever we switch to another tso, so
1351              we don't need it in GUM for now
1352       */
1353       IF_PAR_DEBUG(fish, // schedule,
1354                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1355
1356       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1357                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1358       emitSchedule = rtsFalse;
1359     }
1360 }     
1361 #endif
1362
1363 /* ----------------------------------------------------------------------------
1364  * After running a thread...
1365  * ------------------------------------------------------------------------- */
1366
1367 static void
1368 schedulePostRunThread(void)
1369 {
1370 #if defined(PAR)
1371     /* HACK 675: if the last thread didn't yield, make sure to print a 
1372        SCHEDULE event to the log file when StgRunning the next thread, even
1373        if it is the same one as before */
1374     LastTSO = t; 
1375     TimeOfLastYield = CURRENT_TIME;
1376 #endif
1377
1378   /* some statistics gathering in the parallel case */
1379
1380 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1381   switch (ret) {
1382     case HeapOverflow:
1383 # if defined(GRAN)
1384       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1385       globalGranStats.tot_heapover++;
1386 # elif defined(PAR)
1387       globalParStats.tot_heapover++;
1388 # endif
1389       break;
1390
1391      case StackOverflow:
1392 # if defined(GRAN)
1393       IF_DEBUG(gran, 
1394                DumpGranEvent(GR_DESCHEDULE, t));
1395       globalGranStats.tot_stackover++;
1396 # elif defined(PAR)
1397       // IF_DEBUG(par, 
1398       // DumpGranEvent(GR_DESCHEDULE, t);
1399       globalParStats.tot_stackover++;
1400 # endif
1401       break;
1402
1403     case ThreadYielding:
1404 # if defined(GRAN)
1405       IF_DEBUG(gran, 
1406                DumpGranEvent(GR_DESCHEDULE, t));
1407       globalGranStats.tot_yields++;
1408 # elif defined(PAR)
1409       // IF_DEBUG(par, 
1410       // DumpGranEvent(GR_DESCHEDULE, t);
1411       globalParStats.tot_yields++;
1412 # endif
1413       break; 
1414
1415     case ThreadBlocked:
1416 # if defined(GRAN)
1417       IF_DEBUG(scheduler,
1418                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1419                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1420                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1421                if (t->block_info.closure!=(StgClosure*)NULL)
1422                  print_bq(t->block_info.closure);
1423                debugBelch("\n"));
1424
1425       // ??? needed; should emit block before
1426       IF_DEBUG(gran, 
1427                DumpGranEvent(GR_DESCHEDULE, t)); 
1428       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1429       /*
1430         ngoq Dogh!
1431       ASSERT(procStatus[CurrentProc]==Busy || 
1432               ((procStatus[CurrentProc]==Fetching) && 
1433               (t->block_info.closure!=(StgClosure*)NULL)));
1434       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1435           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1436             procStatus[CurrentProc]==Fetching)) 
1437         procStatus[CurrentProc] = Idle;
1438       */
1439 # elif defined(PAR)
1440 //++PAR++  blockThread() writes the event (change?)
1441 # endif
1442     break;
1443
1444   case ThreadFinished:
1445     break;
1446
1447   default:
1448     barf("parGlobalStats: unknown return code");
1449     break;
1450     }
1451 #endif
1452 }
1453
1454 /* -----------------------------------------------------------------------------
1455  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1456  * -------------------------------------------------------------------------- */
1457
1458 static rtsBool
1459 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1460 {
1461     // did the task ask for a large block?
1462     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1463         // if so, get one and push it on the front of the nursery.
1464         bdescr *bd;
1465         lnat blocks;
1466         
1467         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1468         
1469         IF_DEBUG(scheduler,
1470                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1471                             (long)t->id, whatNext_strs[t->what_next], blocks));
1472         
1473         // don't do this if the nursery is (nearly) full, we'll GC first.
1474         if (cap->r.rCurrentNursery->link != NULL ||
1475             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1476                                                // if the nursery has only one block.
1477             
1478             ACQUIRE_SM_LOCK
1479             bd = allocGroup( blocks );
1480             RELEASE_SM_LOCK
1481             cap->r.rNursery->n_blocks += blocks;
1482             
1483             // link the new group into the list
1484             bd->link = cap->r.rCurrentNursery;
1485             bd->u.back = cap->r.rCurrentNursery->u.back;
1486             if (cap->r.rCurrentNursery->u.back != NULL) {
1487                 cap->r.rCurrentNursery->u.back->link = bd;
1488             } else {
1489 #if !defined(SMP)
1490                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1491                        g0s0 == cap->r.rNursery);
1492 #endif
1493                 cap->r.rNursery->blocks = bd;
1494             }             
1495             cap->r.rCurrentNursery->u.back = bd;
1496             
1497             // initialise it as a nursery block.  We initialise the
1498             // step, gen_no, and flags field of *every* sub-block in
1499             // this large block, because this is easier than making
1500             // sure that we always find the block head of a large
1501             // block whenever we call Bdescr() (eg. evacuate() and
1502             // isAlive() in the GC would both have to do this, at
1503             // least).
1504             { 
1505                 bdescr *x;
1506                 for (x = bd; x < bd + blocks; x++) {
1507                     x->step = cap->r.rNursery;
1508                     x->gen_no = 0;
1509                     x->flags = 0;
1510                 }
1511             }
1512             
1513             // This assert can be a killer if the app is doing lots
1514             // of large block allocations.
1515             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1516             
1517             // now update the nursery to point to the new block
1518             cap->r.rCurrentNursery = bd;
1519             
1520             // we might be unlucky and have another thread get on the
1521             // run queue before us and steal the large block, but in that
1522             // case the thread will just end up requesting another large
1523             // block.
1524             pushOnRunQueue(cap,t);
1525             return rtsFalse;  /* not actually GC'ing */
1526         }
1527     }
1528     
1529     IF_DEBUG(scheduler,
1530              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1531                         (long)t->id, whatNext_strs[t->what_next]));
1532 #if defined(GRAN)
1533     ASSERT(!is_on_queue(t,CurrentProc));
1534 #elif defined(PARALLEL_HASKELL)
1535     /* Currently we emit a DESCHEDULE event before GC in GUM.
1536        ToDo: either add separate event to distinguish SYSTEM time from rest
1537        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1538     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1539         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1540                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1541         emitSchedule = rtsTrue;
1542     }
1543 #endif
1544       
1545     pushOnRunQueue(cap,t);
1546     return rtsTrue;
1547     /* actual GC is done at the end of the while loop in schedule() */
1548 }
1549
1550 /* -----------------------------------------------------------------------------
1551  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1552  * -------------------------------------------------------------------------- */
1553
1554 static void
1555 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1556 {
1557     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1558                                   (long)t->id, whatNext_strs[t->what_next]));
1559     /* just adjust the stack for this thread, then pop it back
1560      * on the run queue.
1561      */
1562     { 
1563         /* enlarge the stack */
1564         StgTSO *new_t = threadStackOverflow(cap, t);
1565         
1566         /* The TSO attached to this Task may have moved, so update the
1567          * pointer to it.
1568          */
1569         if (task->tso == t) {
1570             task->tso = new_t;
1571         }
1572         pushOnRunQueue(cap,new_t);
1573     }
1574 }
1575
1576 /* -----------------------------------------------------------------------------
1577  * Handle a thread that returned to the scheduler with ThreadYielding
1578  * -------------------------------------------------------------------------- */
1579
1580 static rtsBool
1581 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1582 {
1583     // Reset the context switch flag.  We don't do this just before
1584     // running the thread, because that would mean we would lose ticks
1585     // during GC, which can lead to unfair scheduling (a thread hogs
1586     // the CPU because the tick always arrives during GC).  This way
1587     // penalises threads that do a lot of allocation, but that seems
1588     // better than the alternative.
1589     context_switch = 0;
1590     
1591     /* put the thread back on the run queue.  Then, if we're ready to
1592      * GC, check whether this is the last task to stop.  If so, wake
1593      * up the GC thread.  getThread will block during a GC until the
1594      * GC is finished.
1595      */
1596     IF_DEBUG(scheduler,
1597              if (t->what_next != prev_what_next) {
1598                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1599                             (long)t->id, whatNext_strs[t->what_next]);
1600              } else {
1601                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1602                             (long)t->id, whatNext_strs[t->what_next]);
1603              }
1604         );
1605     
1606     IF_DEBUG(sanity,
1607              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1608              checkTSO(t));
1609     ASSERT(t->link == END_TSO_QUEUE);
1610     
1611     // Shortcut if we're just switching evaluators: don't bother
1612     // doing stack squeezing (which can be expensive), just run the
1613     // thread.
1614     if (t->what_next != prev_what_next) {
1615         return rtsTrue;
1616     }
1617     
1618 #if defined(GRAN)
1619     ASSERT(!is_on_queue(t,CurrentProc));
1620       
1621     IF_DEBUG(sanity,
1622              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1623              checkThreadQsSanity(rtsTrue));
1624
1625 #endif
1626
1627     addToRunQueue(cap,t);
1628
1629 #if defined(GRAN)
1630     /* add a ContinueThread event to actually process the thread */
1631     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1632               ContinueThread,
1633               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1634     IF_GRAN_DEBUG(bq, 
1635                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1636                   G_EVENTQ(0);
1637                   G_CURR_THREADQ(0));
1638 #endif
1639     return rtsFalse;
1640 }
1641
1642 /* -----------------------------------------------------------------------------
1643  * Handle a thread that returned to the scheduler with ThreadBlocked
1644  * -------------------------------------------------------------------------- */
1645
1646 static void
1647 scheduleHandleThreadBlocked( StgTSO *t
1648 #if !defined(GRAN) && !defined(DEBUG)
1649     STG_UNUSED
1650 #endif
1651     )
1652 {
1653 #if defined(GRAN)
1654     IF_DEBUG(scheduler,
1655              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1656                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1657              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1658     
1659     // ??? needed; should emit block before
1660     IF_DEBUG(gran, 
1661              DumpGranEvent(GR_DESCHEDULE, t)); 
1662     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1663     /*
1664       ngoq Dogh!
1665       ASSERT(procStatus[CurrentProc]==Busy || 
1666       ((procStatus[CurrentProc]==Fetching) && 
1667       (t->block_info.closure!=(StgClosure*)NULL)));
1668       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1669       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1670       procStatus[CurrentProc]==Fetching)) 
1671       procStatus[CurrentProc] = Idle;
1672     */
1673 #elif defined(PAR)
1674     IF_DEBUG(scheduler,
1675              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1676                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1677     IF_PAR_DEBUG(bq,
1678                  
1679                  if (t->block_info.closure!=(StgClosure*)NULL) 
1680                  print_bq(t->block_info.closure));
1681     
1682     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1683     blockThread(t);
1684     
1685     /* whatever we schedule next, we must log that schedule */
1686     emitSchedule = rtsTrue;
1687     
1688 #else /* !GRAN */
1689
1690       // We don't need to do anything.  The thread is blocked, and it
1691       // has tidied up its stack and placed itself on whatever queue
1692       // it needs to be on.
1693
1694 #if !defined(SMP)
1695     ASSERT(t->why_blocked != NotBlocked);
1696              // This might not be true under SMP: we don't have
1697              // exclusive access to this TSO, so someone might have
1698              // woken it up by now.  This actually happens: try
1699              // conc023 +RTS -N2.
1700 #endif
1701
1702     IF_DEBUG(scheduler,
1703              debugBelch("--<< thread %d (%s) stopped: ", 
1704                         t->id, whatNext_strs[t->what_next]);
1705              printThreadBlockage(t);
1706              debugBelch("\n"));
1707     
1708     /* Only for dumping event to log file 
1709        ToDo: do I need this in GranSim, too?
1710        blockThread(t);
1711     */
1712 #endif
1713 }
1714
1715 /* -----------------------------------------------------------------------------
1716  * Handle a thread that returned to the scheduler with ThreadFinished
1717  * -------------------------------------------------------------------------- */
1718
1719 static rtsBool
1720 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1721 {
1722     /* Need to check whether this was a main thread, and if so,
1723      * return with the return value.
1724      *
1725      * We also end up here if the thread kills itself with an
1726      * uncaught exception, see Exception.cmm.
1727      */
1728     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1729                                   t->id, whatNext_strs[t->what_next]));
1730
1731 #if defined(GRAN)
1732       endThread(t, CurrentProc); // clean-up the thread
1733 #elif defined(PARALLEL_HASKELL)
1734       /* For now all are advisory -- HWL */
1735       //if(t->priority==AdvisoryPriority) ??
1736       advisory_thread_count--; // JB: Caution with this counter, buggy!
1737       
1738 # if defined(DIST)
1739       if(t->dist.priority==RevalPriority)
1740         FinishReval(t);
1741 # endif
1742     
1743 # if defined(EDENOLD)
1744       // the thread could still have an outport... (BUG)
1745       if (t->eden.outport != -1) {
1746       // delete the outport for the tso which has finished...
1747         IF_PAR_DEBUG(eden_ports,
1748                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1749                               t->eden.outport, t->id));
1750         deleteOPT(t);
1751       }
1752       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1753       if (t->eden.epid != -1) {
1754         IF_PAR_DEBUG(eden_ports,
1755                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1756                            t->id, t->eden.epid));
1757         removeTSOfromProcess(t);
1758       }
1759 # endif 
1760
1761 # if defined(PAR)
1762       if (RtsFlags.ParFlags.ParStats.Full &&
1763           !RtsFlags.ParFlags.ParStats.Suppressed) 
1764         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1765
1766       //  t->par only contains statistics: left out for now...
1767       IF_PAR_DEBUG(fish,
1768                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1769                               t->id,t,t->par.sparkname));
1770 # endif
1771 #endif // PARALLEL_HASKELL
1772
1773       //
1774       // Check whether the thread that just completed was a bound
1775       // thread, and if so return with the result.  
1776       //
1777       // There is an assumption here that all thread completion goes
1778       // through this point; we need to make sure that if a thread
1779       // ends up in the ThreadKilled state, that it stays on the run
1780       // queue so it can be dealt with here.
1781       //
1782
1783       if (t->bound) {
1784
1785           if (t->bound != task) {
1786 #if !defined(THREADED_RTS)
1787               // Must be a bound thread that is not the topmost one.  Leave
1788               // it on the run queue until the stack has unwound to the
1789               // point where we can deal with this.  Leaving it on the run
1790               // queue also ensures that the garbage collector knows about
1791               // this thread and its return value (it gets dropped from the
1792               // all_threads list so there's no other way to find it).
1793               appendToRunQueue(cap,t);
1794               return rtsFalse;
1795 #else
1796               // this cannot happen in the threaded RTS, because a
1797               // bound thread can only be run by the appropriate Task.
1798               barf("finished bound thread that isn't mine");
1799 #endif
1800           }
1801
1802           ASSERT(task->tso == t);
1803
1804           if (t->what_next == ThreadComplete) {
1805               if (task->ret) {
1806                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1807                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1808               }
1809               task->stat = Success;
1810           } else {
1811               if (task->ret) {
1812                   *(task->ret) = NULL;
1813               }
1814               if (interrupted) {
1815                   task->stat = Interrupted;
1816               } else {
1817                   task->stat = Killed;
1818               }
1819           }
1820 #ifdef DEBUG
1821           removeThreadLabel((StgWord)task->tso->id);
1822 #endif
1823           return rtsTrue; // tells schedule() to return
1824       }
1825
1826       return rtsFalse;
1827 }
1828
1829 /* -----------------------------------------------------------------------------
1830  * Perform a heap census, if PROFILING
1831  * -------------------------------------------------------------------------- */
1832
1833 static rtsBool
1834 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1835 {
1836 #if defined(PROFILING)
1837     // When we have +RTS -i0 and we're heap profiling, do a census at
1838     // every GC.  This lets us get repeatable runs for debugging.
1839     if (performHeapProfile ||
1840         (RtsFlags.ProfFlags.profileInterval==0 &&
1841          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1842         GarbageCollect(GetRoots, rtsTrue);
1843         heapCensus();
1844         performHeapProfile = rtsFalse;
1845         return rtsTrue;  // true <=> we already GC'd
1846     }
1847 #endif
1848     return rtsFalse;
1849 }
1850
1851 /* -----------------------------------------------------------------------------
1852  * Perform a garbage collection if necessary
1853  * -------------------------------------------------------------------------- */
1854
1855 static void
1856 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1857 {
1858     StgTSO *t;
1859 #ifdef SMP
1860     static volatile StgWord waiting_for_gc;
1861     rtsBool was_waiting;
1862     nat i;
1863 #endif
1864
1865 #ifdef SMP
1866     // In order to GC, there must be no threads running Haskell code.
1867     // Therefore, the GC thread needs to hold *all* the capabilities,
1868     // and release them after the GC has completed.  
1869     //
1870     // This seems to be the simplest way: previous attempts involved
1871     // making all the threads with capabilities give up their
1872     // capabilities and sleep except for the *last* one, which
1873     // actually did the GC.  But it's quite hard to arrange for all
1874     // the other tasks to sleep and stay asleep.
1875     //
1876         
1877     was_waiting = cas(&waiting_for_gc, 0, 1);
1878     if (was_waiting) {
1879         do {
1880             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1881             yieldCapability(&cap,task);
1882         } while (waiting_for_gc);
1883         return;
1884     }
1885
1886     for (i=0; i < n_capabilities; i++) {
1887         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1888         if (cap != &capabilities[i]) {
1889             Capability *pcap = &capabilities[i];
1890             // we better hope this task doesn't get migrated to
1891             // another Capability while we're waiting for this one.
1892             // It won't, because load balancing happens while we have
1893             // all the Capabilities, but even so it's a slightly
1894             // unsavoury invariant.
1895             task->cap = pcap;
1896             context_switch = 1;
1897             waitForReturnCapability(&pcap, task);
1898             if (pcap != &capabilities[i]) {
1899                 barf("scheduleDoGC: got the wrong capability");
1900             }
1901         }
1902     }
1903
1904     waiting_for_gc = rtsFalse;
1905 #endif
1906
1907     /* Kick any transactions which are invalid back to their
1908      * atomically frames.  When next scheduled they will try to
1909      * commit, this commit will fail and they will retry.
1910      */
1911     { 
1912         StgTSO *next;
1913
1914         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1915             if (t->what_next == ThreadRelocated) {
1916                 next = t->link;
1917             } else {
1918                 next = t->global_link;
1919                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1920                     if (!stmValidateNestOfTransactions (t -> trec)) {
1921                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1922                         
1923                         // strip the stack back to the
1924                         // ATOMICALLY_FRAME, aborting the (nested)
1925                         // transaction, and saving the stack of any
1926                         // partially-evaluated thunks on the heap.
1927                         raiseAsync_(cap, t, NULL, rtsTrue);
1928                         
1929 #ifdef REG_R1
1930                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1931 #endif
1932                     }
1933                 }
1934             }
1935         }
1936     }
1937     
1938     // so this happens periodically:
1939     scheduleCheckBlackHoles(cap);
1940     
1941     IF_DEBUG(scheduler, printAllThreads());
1942
1943     /* everybody back, start the GC.
1944      * Could do it in this thread, or signal a condition var
1945      * to do it in another thread.  Either way, we need to
1946      * broadcast on gc_pending_cond afterward.
1947      */
1948 #if defined(THREADED_RTS)
1949     IF_DEBUG(scheduler,sched_belch("doing GC"));
1950 #endif
1951     GarbageCollect(GetRoots, force_major);
1952     
1953 #if defined(SMP)
1954     // release our stash of capabilities.
1955     for (i = 0; i < n_capabilities; i++) {
1956         if (cap != &capabilities[i]) {
1957             task->cap = &capabilities[i];
1958             releaseCapability(&capabilities[i]);
1959         }
1960     }
1961     task->cap = cap;
1962 #endif
1963
1964 #if defined(GRAN)
1965     /* add a ContinueThread event to continue execution of current thread */
1966     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1967               ContinueThread,
1968               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1969     IF_GRAN_DEBUG(bq, 
1970                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1971                   G_EVENTQ(0);
1972                   G_CURR_THREADQ(0));
1973 #endif /* GRAN */
1974 }
1975
1976 /* ---------------------------------------------------------------------------
1977  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1978  * used by Control.Concurrent for error checking.
1979  * ------------------------------------------------------------------------- */
1980  
1981 StgBool
1982 rtsSupportsBoundThreads(void)
1983 {
1984 #if defined(THREADED_RTS)
1985   return rtsTrue;
1986 #else
1987   return rtsFalse;
1988 #endif
1989 }
1990
1991 /* ---------------------------------------------------------------------------
1992  * isThreadBound(tso): check whether tso is bound to an OS thread.
1993  * ------------------------------------------------------------------------- */
1994  
1995 StgBool
1996 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
1997 {
1998 #if defined(THREADED_RTS)
1999   return (tso->bound != NULL);
2000 #endif
2001   return rtsFalse;
2002 }
2003
2004 /* ---------------------------------------------------------------------------
2005  * Singleton fork(). Do not copy any running threads.
2006  * ------------------------------------------------------------------------- */
2007
2008 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2009 #define FORKPROCESS_PRIMOP_SUPPORTED
2010 #endif
2011
2012 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2013 static void 
2014 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2015 #endif
2016 StgInt
2017 forkProcess(HsStablePtr *entry
2018 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2019             STG_UNUSED
2020 #endif
2021            )
2022 {
2023 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2024     Task *task;
2025     pid_t pid;
2026     StgTSO* t,*next;
2027     Capability *cap;
2028     
2029     IF_DEBUG(scheduler,sched_belch("forking!"));
2030     
2031     // ToDo: for SMP, we should probably acquire *all* the capabilities
2032     cap = rts_lock();
2033     
2034     pid = fork();
2035     
2036     if (pid) { // parent
2037         
2038         // just return the pid
2039         rts_unlock(cap);
2040         return pid;
2041         
2042     } else { // child
2043         
2044         // delete all threads
2045         cap->run_queue_hd = END_TSO_QUEUE;
2046         cap->run_queue_tl = END_TSO_QUEUE;
2047         
2048         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2049             next = t->link;
2050             
2051             // don't allow threads to catch the ThreadKilled exception
2052             deleteThreadImmediately(cap,t);
2053         }
2054         
2055         // wipe the task list
2056         ACQUIRE_LOCK(&sched_mutex);
2057         for (task = all_tasks; task != NULL; task=task->all_link) {
2058             if (task != cap->running_task) discardTask(task);
2059         }
2060         RELEASE_LOCK(&sched_mutex);
2061
2062 #if defined(THREADED_RTS)
2063         // wipe our spare workers list.
2064         cap->spare_workers = NULL;
2065 #endif
2066
2067         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2068         rts_checkSchedStatus("forkProcess",cap);
2069         
2070         rts_unlock(cap);
2071         hs_exit();                      // clean up and exit
2072         stg_exit(EXIT_SUCCESS);
2073     }
2074 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2075     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2076     return -1;
2077 #endif
2078 }
2079
2080 /* ---------------------------------------------------------------------------
2081  * Delete the threads on the run queue of the current capability.
2082  * ------------------------------------------------------------------------- */
2083    
2084 static void
2085 deleteRunQueue (Capability *cap)
2086 {
2087     StgTSO *t, *next;
2088     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2089         ASSERT(t->what_next != ThreadRelocated);
2090         next = t->link;
2091         deleteThread(cap, t);
2092     }
2093 }
2094
2095 /* startThread and  insertThread are now in GranSim.c -- HWL */
2096
2097
2098 /* -----------------------------------------------------------------------------
2099    Managing the suspended_ccalling_tasks list.
2100    Locks required: sched_mutex
2101    -------------------------------------------------------------------------- */
2102
2103 STATIC_INLINE void
2104 suspendTask (Capability *cap, Task *task)
2105 {
2106     ASSERT(task->next == NULL && task->prev == NULL);
2107     task->next = cap->suspended_ccalling_tasks;
2108     task->prev = NULL;
2109     if (cap->suspended_ccalling_tasks) {
2110         cap->suspended_ccalling_tasks->prev = task;
2111     }
2112     cap->suspended_ccalling_tasks = task;
2113 }
2114
2115 STATIC_INLINE void
2116 recoverSuspendedTask (Capability *cap, Task *task)
2117 {
2118     if (task->prev) {
2119         task->prev->next = task->next;
2120     } else {
2121         ASSERT(cap->suspended_ccalling_tasks == task);
2122         cap->suspended_ccalling_tasks = task->next;
2123     }
2124     if (task->next) {
2125         task->next->prev = task->prev;
2126     }
2127     task->next = task->prev = NULL;
2128 }
2129
2130 /* ---------------------------------------------------------------------------
2131  * Suspending & resuming Haskell threads.
2132  * 
2133  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2134  * its capability before calling the C function.  This allows another
2135  * task to pick up the capability and carry on running Haskell
2136  * threads.  It also means that if the C call blocks, it won't lock
2137  * the whole system.
2138  *
2139  * The Haskell thread making the C call is put to sleep for the
2140  * duration of the call, on the susepended_ccalling_threads queue.  We
2141  * give out a token to the task, which it can use to resume the thread
2142  * on return from the C function.
2143  * ------------------------------------------------------------------------- */
2144    
2145 void *
2146 suspendThread (StgRegTable *reg)
2147 {
2148   Capability *cap;
2149   int saved_errno = errno;
2150   StgTSO *tso;
2151   Task *task;
2152
2153   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2154    */
2155   cap = regTableToCapability(reg);
2156
2157   task = cap->running_task;
2158   tso = cap->r.rCurrentTSO;
2159
2160   IF_DEBUG(scheduler,
2161            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2162
2163   // XXX this might not be necessary --SDM
2164   tso->what_next = ThreadRunGHC;
2165
2166   threadPaused(tso);
2167
2168   if(tso->blocked_exceptions == NULL)  {
2169       tso->why_blocked = BlockedOnCCall;
2170       tso->blocked_exceptions = END_TSO_QUEUE;
2171   } else {
2172       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2173   }
2174
2175   // Hand back capability
2176   task->suspended_tso = tso;
2177
2178   ACQUIRE_LOCK(&cap->lock);
2179
2180   suspendTask(cap,task);
2181   cap->in_haskell = rtsFalse;
2182   releaseCapability_(cap);
2183   
2184   RELEASE_LOCK(&cap->lock);
2185
2186 #if defined(THREADED_RTS)
2187   /* Preparing to leave the RTS, so ensure there's a native thread/task
2188      waiting to take over.
2189   */
2190   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2191 #endif
2192
2193   errno = saved_errno;
2194   return task;
2195 }
2196
2197 StgRegTable *
2198 resumeThread (void *task_)
2199 {
2200     StgTSO *tso;
2201     Capability *cap;
2202     int saved_errno = errno;
2203     Task *task = task_;
2204
2205     cap = task->cap;
2206     // Wait for permission to re-enter the RTS with the result.
2207     waitForReturnCapability(&cap,task);
2208     // we might be on a different capability now... but if so, our
2209     // entry on the suspended_ccalling_tasks list will also have been
2210     // migrated.
2211
2212     // Remove the thread from the suspended list
2213     recoverSuspendedTask(cap,task);
2214
2215     tso = task->suspended_tso;
2216     task->suspended_tso = NULL;
2217     tso->link = END_TSO_QUEUE;
2218     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2219     
2220     if (tso->why_blocked == BlockedOnCCall) {
2221         awakenBlockedQueue(cap,tso->blocked_exceptions);
2222         tso->blocked_exceptions = NULL;
2223     }
2224     
2225     /* Reset blocking status */
2226     tso->why_blocked  = NotBlocked;
2227     
2228     cap->r.rCurrentTSO = tso;
2229     cap->in_haskell = rtsTrue;
2230     errno = saved_errno;
2231
2232     return &cap->r;
2233 }
2234
2235 /* ---------------------------------------------------------------------------
2236  * Comparing Thread ids.
2237  *
2238  * This is used from STG land in the implementation of the
2239  * instances of Eq/Ord for ThreadIds.
2240  * ------------------------------------------------------------------------ */
2241
2242 int
2243 cmp_thread(StgPtr tso1, StgPtr tso2) 
2244
2245   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2246   StgThreadID id2 = ((StgTSO *)tso2)->id;
2247  
2248   if (id1 < id2) return (-1);
2249   if (id1 > id2) return 1;
2250   return 0;
2251 }
2252
2253 /* ---------------------------------------------------------------------------
2254  * Fetching the ThreadID from an StgTSO.
2255  *
2256  * This is used in the implementation of Show for ThreadIds.
2257  * ------------------------------------------------------------------------ */
2258 int
2259 rts_getThreadId(StgPtr tso) 
2260 {
2261   return ((StgTSO *)tso)->id;
2262 }
2263
2264 #ifdef DEBUG
2265 void
2266 labelThread(StgPtr tso, char *label)
2267 {
2268   int len;
2269   void *buf;
2270
2271   /* Caveat: Once set, you can only set the thread name to "" */
2272   len = strlen(label)+1;
2273   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2274   strncpy(buf,label,len);
2275   /* Update will free the old memory for us */
2276   updateThreadLabel(((StgTSO *)tso)->id,buf);
2277 }
2278 #endif /* DEBUG */
2279
2280 /* ---------------------------------------------------------------------------
2281    Create a new thread.
2282
2283    The new thread starts with the given stack size.  Before the
2284    scheduler can run, however, this thread needs to have a closure
2285    (and possibly some arguments) pushed on its stack.  See
2286    pushClosure() in Schedule.h.
2287
2288    createGenThread() and createIOThread() (in SchedAPI.h) are
2289    convenient packaged versions of this function.
2290
2291    currently pri (priority) is only used in a GRAN setup -- HWL
2292    ------------------------------------------------------------------------ */
2293 #if defined(GRAN)
2294 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2295 StgTSO *
2296 createThread(nat size, StgInt pri)
2297 #else
2298 StgTSO *
2299 createThread(Capability *cap, nat size)
2300 #endif
2301 {
2302     StgTSO *tso;
2303     nat stack_size;
2304
2305     /* sched_mutex is *not* required */
2306
2307     /* First check whether we should create a thread at all */
2308 #if defined(PARALLEL_HASKELL)
2309     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2310     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2311         threadsIgnored++;
2312         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2313                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2314         return END_TSO_QUEUE;
2315     }
2316     threadsCreated++;
2317 #endif
2318
2319 #if defined(GRAN)
2320     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2321 #endif
2322
2323     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2324
2325     /* catch ridiculously small stack sizes */
2326     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2327         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2328     }
2329
2330     stack_size = size - TSO_STRUCT_SIZEW;
2331     
2332     tso = (StgTSO *)allocateLocal(cap, size);
2333     TICK_ALLOC_TSO(stack_size, 0);
2334
2335     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2336 #if defined(GRAN)
2337     SET_GRAN_HDR(tso, ThisPE);
2338 #endif
2339
2340     // Always start with the compiled code evaluator
2341     tso->what_next = ThreadRunGHC;
2342
2343     tso->why_blocked  = NotBlocked;
2344     tso->blocked_exceptions = NULL;
2345     
2346     tso->saved_errno = 0;
2347     tso->bound = NULL;
2348     
2349     tso->stack_size     = stack_size;
2350     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2351                           - TSO_STRUCT_SIZEW;
2352     tso->sp             = (P_)&(tso->stack) + stack_size;
2353
2354     tso->trec = NO_TREC;
2355     
2356 #ifdef PROFILING
2357     tso->prof.CCCS = CCS_MAIN;
2358 #endif
2359     
2360   /* put a stop frame on the stack */
2361     tso->sp -= sizeofW(StgStopFrame);
2362     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2363     tso->link = END_TSO_QUEUE;
2364     
2365   // ToDo: check this
2366 #if defined(GRAN)
2367     /* uses more flexible routine in GranSim */
2368     insertThread(tso, CurrentProc);
2369 #else
2370     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2371      * from its creation
2372      */
2373 #endif
2374     
2375 #if defined(GRAN) 
2376     if (RtsFlags.GranFlags.GranSimStats.Full) 
2377         DumpGranEvent(GR_START,tso);
2378 #elif defined(PARALLEL_HASKELL)
2379     if (RtsFlags.ParFlags.ParStats.Full) 
2380         DumpGranEvent(GR_STARTQ,tso);
2381     /* HACk to avoid SCHEDULE 
2382        LastTSO = tso; */
2383 #endif
2384     
2385     /* Link the new thread on the global thread list.
2386      */
2387     ACQUIRE_LOCK(&sched_mutex);
2388     tso->id = next_thread_id++;  // while we have the mutex
2389     tso->global_link = all_threads;
2390     all_threads = tso;
2391     RELEASE_LOCK(&sched_mutex);
2392     
2393 #if defined(DIST)
2394     tso->dist.priority = MandatoryPriority; //by default that is...
2395 #endif
2396     
2397 #if defined(GRAN)
2398     tso->gran.pri = pri;
2399 # if defined(DEBUG)
2400     tso->gran.magic = TSO_MAGIC; // debugging only
2401 # endif
2402     tso->gran.sparkname   = 0;
2403     tso->gran.startedat   = CURRENT_TIME; 
2404     tso->gran.exported    = 0;
2405     tso->gran.basicblocks = 0;
2406     tso->gran.allocs      = 0;
2407     tso->gran.exectime    = 0;
2408     tso->gran.fetchtime   = 0;
2409     tso->gran.fetchcount  = 0;
2410     tso->gran.blocktime   = 0;
2411     tso->gran.blockcount  = 0;
2412     tso->gran.blockedat   = 0;
2413     tso->gran.globalsparks = 0;
2414     tso->gran.localsparks  = 0;
2415     if (RtsFlags.GranFlags.Light)
2416         tso->gran.clock  = Now; /* local clock */
2417     else
2418         tso->gran.clock  = 0;
2419     
2420     IF_DEBUG(gran,printTSO(tso));
2421 #elif defined(PARALLEL_HASKELL)
2422 # if defined(DEBUG)
2423     tso->par.magic = TSO_MAGIC; // debugging only
2424 # endif
2425     tso->par.sparkname   = 0;
2426     tso->par.startedat   = CURRENT_TIME; 
2427     tso->par.exported    = 0;
2428     tso->par.basicblocks = 0;
2429     tso->par.allocs      = 0;
2430     tso->par.exectime    = 0;
2431     tso->par.fetchtime   = 0;
2432     tso->par.fetchcount  = 0;
2433     tso->par.blocktime   = 0;
2434     tso->par.blockcount  = 0;
2435     tso->par.blockedat   = 0;
2436     tso->par.globalsparks = 0;
2437     tso->par.localsparks  = 0;
2438 #endif
2439     
2440 #if defined(GRAN)
2441     globalGranStats.tot_threads_created++;
2442     globalGranStats.threads_created_on_PE[CurrentProc]++;
2443     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2444     globalGranStats.tot_sq_probes++;
2445 #elif defined(PARALLEL_HASKELL)
2446     // collect parallel global statistics (currently done together with GC stats)
2447     if (RtsFlags.ParFlags.ParStats.Global &&
2448         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2449         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2450         globalParStats.tot_threads_created++;
2451     }
2452 #endif 
2453     
2454 #if defined(GRAN)
2455     IF_GRAN_DEBUG(pri,
2456                   sched_belch("==__ schedule: Created TSO %d (%p);",
2457                               CurrentProc, tso, tso->id));
2458 #elif defined(PARALLEL_HASKELL)
2459     IF_PAR_DEBUG(verbose,
2460                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2461                              (long)tso->id, tso, advisory_thread_count));
2462 #else
2463     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2464                                    (long)tso->id, (long)tso->stack_size));
2465 #endif    
2466     return tso;
2467 }
2468
2469 #if defined(PAR)
2470 /* RFP:
2471    all parallel thread creation calls should fall through the following routine.
2472 */
2473 StgTSO *
2474 createThreadFromSpark(rtsSpark spark) 
2475 { StgTSO *tso;
2476   ASSERT(spark != (rtsSpark)NULL);
2477 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2478   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2479   { threadsIgnored++;
2480     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2481           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2482     return END_TSO_QUEUE;
2483   }
2484   else
2485   { threadsCreated++;
2486     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2487     if (tso==END_TSO_QUEUE)     
2488       barf("createSparkThread: Cannot create TSO");
2489 #if defined(DIST)
2490     tso->priority = AdvisoryPriority;
2491 #endif
2492     pushClosure(tso,spark);
2493     addToRunQueue(tso);
2494     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2495   }
2496   return tso;
2497 }
2498 #endif
2499
2500 /*
2501   Turn a spark into a thread.
2502   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2503 */
2504 #if 0
2505 StgTSO *
2506 activateSpark (rtsSpark spark) 
2507 {
2508   StgTSO *tso;
2509
2510   tso = createSparkThread(spark);
2511   if (RtsFlags.ParFlags.ParStats.Full) {   
2512     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2513       IF_PAR_DEBUG(verbose,
2514                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2515                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2516   }
2517   // ToDo: fwd info on local/global spark to thread -- HWL
2518   // tso->gran.exported =  spark->exported;
2519   // tso->gran.locked =   !spark->global;
2520   // tso->gran.sparkname = spark->name;
2521
2522   return tso;
2523 }
2524 #endif
2525
2526 /* ---------------------------------------------------------------------------
2527  * scheduleThread()
2528  *
2529  * scheduleThread puts a thread on the end  of the runnable queue.
2530  * This will usually be done immediately after a thread is created.
2531  * The caller of scheduleThread must create the thread using e.g.
2532  * createThread and push an appropriate closure
2533  * on this thread's stack before the scheduler is invoked.
2534  * ------------------------------------------------------------------------ */
2535
2536 void
2537 scheduleThread(Capability *cap, StgTSO *tso)
2538 {
2539     // The thread goes at the *end* of the run-queue, to avoid possible
2540     // starvation of any threads already on the queue.
2541     appendToRunQueue(cap,tso);
2542 }
2543
2544 Capability *
2545 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2546 {
2547     Task *task;
2548
2549     // We already created/initialised the Task
2550     task = cap->running_task;
2551
2552     // This TSO is now a bound thread; make the Task and TSO
2553     // point to each other.
2554     tso->bound = task;
2555
2556     task->tso = tso;
2557     task->ret = ret;
2558     task->stat = NoStatus;
2559
2560     appendToRunQueue(cap,tso);
2561
2562     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2563
2564 #if defined(GRAN)
2565     /* GranSim specific init */
2566     CurrentTSO = m->tso;                // the TSO to run
2567     procStatus[MainProc] = Busy;        // status of main PE
2568     CurrentProc = MainProc;             // PE to run it on
2569 #endif
2570
2571     cap = schedule(cap,task);
2572
2573     ASSERT(task->stat != NoStatus);
2574     ASSERT_CAPABILITY_INVARIANTS(cap,task);
2575
2576     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2577     return cap;
2578 }
2579
2580 /* ----------------------------------------------------------------------------
2581  * Starting Tasks
2582  * ------------------------------------------------------------------------- */
2583
2584 #if defined(THREADED_RTS)
2585 void
2586 workerStart(Task *task)
2587 {
2588     Capability *cap;
2589
2590     // See startWorkerTask().
2591     ACQUIRE_LOCK(&task->lock);
2592     cap = task->cap;
2593     RELEASE_LOCK(&task->lock);
2594
2595     // set the thread-local pointer to the Task:
2596     taskEnter(task);
2597
2598     // schedule() runs without a lock.
2599     cap = schedule(cap,task);
2600
2601     // On exit from schedule(), we have a Capability.
2602     releaseCapability(cap);
2603     taskStop(task);
2604 }
2605 #endif
2606
2607 /* ---------------------------------------------------------------------------
2608  * initScheduler()
2609  *
2610  * Initialise the scheduler.  This resets all the queues - if the
2611  * queues contained any threads, they'll be garbage collected at the
2612  * next pass.
2613  *
2614  * ------------------------------------------------------------------------ */
2615
2616 void 
2617 initScheduler(void)
2618 {
2619 #if defined(GRAN)
2620   nat i;
2621   for (i=0; i<=MAX_PROC; i++) {
2622     run_queue_hds[i]      = END_TSO_QUEUE;
2623     run_queue_tls[i]      = END_TSO_QUEUE;
2624     blocked_queue_hds[i]  = END_TSO_QUEUE;
2625     blocked_queue_tls[i]  = END_TSO_QUEUE;
2626     ccalling_threadss[i]  = END_TSO_QUEUE;
2627     blackhole_queue[i]    = END_TSO_QUEUE;
2628     sleeping_queue        = END_TSO_QUEUE;
2629   }
2630 #elif !defined(THREADED_RTS)
2631   blocked_queue_hd  = END_TSO_QUEUE;
2632   blocked_queue_tl  = END_TSO_QUEUE;
2633   sleeping_queue    = END_TSO_QUEUE;
2634 #endif
2635
2636   blackhole_queue   = END_TSO_QUEUE;
2637   all_threads       = END_TSO_QUEUE;
2638
2639   context_switch = 0;
2640   interrupted    = 0;
2641
2642   RtsFlags.ConcFlags.ctxtSwitchTicks =
2643       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2644       
2645 #if defined(THREADED_RTS)
2646   /* Initialise the mutex and condition variables used by
2647    * the scheduler. */
2648   initMutex(&sched_mutex);
2649 #endif
2650   
2651   ACQUIRE_LOCK(&sched_mutex);
2652
2653   /* A capability holds the state a native thread needs in
2654    * order to execute STG code. At least one capability is
2655    * floating around (only SMP builds have more than one).
2656    */
2657   initCapabilities();
2658
2659   initTaskManager();
2660
2661 #if defined(SMP)
2662   /*
2663    * Eagerly start one worker to run each Capability, except for
2664    * Capability 0.  The idea is that we're probably going to start a
2665    * bound thread on Capability 0 pretty soon, so we don't want a
2666    * worker task hogging it.
2667    */
2668   { 
2669       nat i;
2670       Capability *cap;
2671       for (i = 1; i < n_capabilities; i++) {
2672           cap = &capabilities[i];
2673           ACQUIRE_LOCK(&cap->lock);
2674           startWorkerTask(cap, workerStart);
2675           RELEASE_LOCK(&cap->lock);
2676       }
2677   }
2678 #endif
2679
2680 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2681   initSparkPools();
2682 #endif
2683
2684   RELEASE_LOCK(&sched_mutex);
2685 }
2686
2687 void
2688 exitScheduler( void )
2689 {
2690     interrupted = rtsTrue;
2691     shutting_down_scheduler = rtsTrue;
2692
2693 #if defined(THREADED_RTS)
2694     { 
2695         Task *task;
2696         nat i;
2697         
2698         ACQUIRE_LOCK(&sched_mutex);
2699         task = newBoundTask();
2700         RELEASE_LOCK(&sched_mutex);
2701
2702         for (i = 0; i < n_capabilities; i++) {
2703             shutdownCapability(&capabilities[i], task);
2704         }
2705         boundTaskExiting(task);
2706         stopTaskManager();
2707     }
2708 #endif
2709 }
2710
2711 /* ---------------------------------------------------------------------------
2712    Where are the roots that we know about?
2713
2714         - all the threads on the runnable queue
2715         - all the threads on the blocked queue
2716         - all the threads on the sleeping queue
2717         - all the thread currently executing a _ccall_GC
2718         - all the "main threads"
2719      
2720    ------------------------------------------------------------------------ */
2721
2722 /* This has to be protected either by the scheduler monitor, or by the
2723         garbage collection monitor (probably the latter).
2724         KH @ 25/10/99
2725 */
2726
2727 void
2728 GetRoots( evac_fn evac )
2729 {
2730     nat i;
2731     Capability *cap;
2732     Task *task;
2733
2734 #if defined(GRAN)
2735     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2736         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2737             evac((StgClosure **)&run_queue_hds[i]);
2738         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2739             evac((StgClosure **)&run_queue_tls[i]);
2740         
2741         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2742             evac((StgClosure **)&blocked_queue_hds[i]);
2743         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2744             evac((StgClosure **)&blocked_queue_tls[i]);
2745         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2746             evac((StgClosure **)&ccalling_threads[i]);
2747     }
2748
2749     markEventQueue();
2750
2751 #else /* !GRAN */
2752
2753     for (i = 0; i < n_capabilities; i++) {
2754         cap = &capabilities[i];
2755         evac((StgClosure **)&cap->run_queue_hd);
2756         evac((StgClosure **)&cap->run_queue_tl);
2757         
2758         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2759              task=task->next) {
2760             evac((StgClosure **)&task->suspended_tso);
2761         }
2762     }
2763     
2764 #if !defined(THREADED_RTS)
2765     evac((StgClosure **)&blocked_queue_hd);
2766     evac((StgClosure **)&blocked_queue_tl);
2767     evac((StgClosure **)&sleeping_queue);
2768 #endif 
2769 #endif
2770
2771     evac((StgClosure **)&blackhole_queue);
2772
2773 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2774     markSparkQueue(evac);
2775 #endif
2776     
2777 #if defined(RTS_USER_SIGNALS)
2778     // mark the signal handlers (signals should be already blocked)
2779     markSignalHandlers(evac);
2780 #endif
2781 }
2782
2783 /* -----------------------------------------------------------------------------
2784    performGC
2785
2786    This is the interface to the garbage collector from Haskell land.
2787    We provide this so that external C code can allocate and garbage
2788    collect when called from Haskell via _ccall_GC.
2789
2790    It might be useful to provide an interface whereby the programmer
2791    can specify more roots (ToDo).
2792    
2793    This needs to be protected by the GC condition variable above.  KH.
2794    -------------------------------------------------------------------------- */
2795
2796 static void (*extra_roots)(evac_fn);
2797
2798 void
2799 performGC(void)
2800 {
2801 #ifdef THREADED_RTS
2802     // ToDo: we have to grab all the capabilities here.
2803     errorBelch("performGC not supported in threaded RTS (yet)");
2804     stg_exit(EXIT_FAILURE);
2805 #endif
2806     /* Obligated to hold this lock upon entry */
2807     GarbageCollect(GetRoots,rtsFalse);
2808 }
2809
2810 void
2811 performMajorGC(void)
2812 {
2813 #ifdef THREADED_RTS
2814     errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2815     stg_exit(EXIT_FAILURE);
2816 #endif
2817     GarbageCollect(GetRoots,rtsTrue);
2818 }
2819
2820 static void
2821 AllRoots(evac_fn evac)
2822 {
2823     GetRoots(evac);             // the scheduler's roots
2824     extra_roots(evac);          // the user's roots
2825 }
2826
2827 void
2828 performGCWithRoots(void (*get_roots)(evac_fn))
2829 {
2830 #ifdef THREADED_RTS
2831     errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2832     stg_exit(EXIT_FAILURE);
2833 #endif
2834     extra_roots = get_roots;
2835     GarbageCollect(AllRoots,rtsFalse);
2836 }
2837
2838 /* -----------------------------------------------------------------------------
2839    Stack overflow
2840
2841    If the thread has reached its maximum stack size, then raise the
2842    StackOverflow exception in the offending thread.  Otherwise
2843    relocate the TSO into a larger chunk of memory and adjust its stack
2844    size appropriately.
2845    -------------------------------------------------------------------------- */
2846
2847 static StgTSO *
2848 threadStackOverflow(Capability *cap, StgTSO *tso)
2849 {
2850   nat new_stack_size, stack_words;
2851   lnat new_tso_size;
2852   StgPtr new_sp;
2853   StgTSO *dest;
2854
2855   IF_DEBUG(sanity,checkTSO(tso));
2856   if (tso->stack_size >= tso->max_stack_size) {
2857
2858     IF_DEBUG(gc,
2859              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2860                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2861              /* If we're debugging, just print out the top of the stack */
2862              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2863                                               tso->sp+64)));
2864
2865     /* Send this thread the StackOverflow exception */
2866     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2867     return tso;
2868   }
2869
2870   /* Try to double the current stack size.  If that takes us over the
2871    * maximum stack size for this thread, then use the maximum instead.
2872    * Finally round up so the TSO ends up as a whole number of blocks.
2873    */
2874   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2875   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2876                                        TSO_STRUCT_SIZE)/sizeof(W_);
2877   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2878   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2879
2880   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2881
2882   dest = (StgTSO *)allocate(new_tso_size);
2883   TICK_ALLOC_TSO(new_stack_size,0);
2884
2885   /* copy the TSO block and the old stack into the new area */
2886   memcpy(dest,tso,TSO_STRUCT_SIZE);
2887   stack_words = tso->stack + tso->stack_size - tso->sp;
2888   new_sp = (P_)dest + new_tso_size - stack_words;
2889   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2890
2891   /* relocate the stack pointers... */
2892   dest->sp         = new_sp;
2893   dest->stack_size = new_stack_size;
2894         
2895   /* Mark the old TSO as relocated.  We have to check for relocated
2896    * TSOs in the garbage collector and any primops that deal with TSOs.
2897    *
2898    * It's important to set the sp value to just beyond the end
2899    * of the stack, so we don't attempt to scavenge any part of the
2900    * dead TSO's stack.
2901    */
2902   tso->what_next = ThreadRelocated;
2903   tso->link = dest;
2904   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2905   tso->why_blocked = NotBlocked;
2906
2907   IF_PAR_DEBUG(verbose,
2908                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2909                      tso->id, tso, tso->stack_size);
2910                /* If we're debugging, just print out the top of the stack */
2911                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2912                                                 tso->sp+64)));
2913   
2914   IF_DEBUG(sanity,checkTSO(tso));
2915 #if 0
2916   IF_DEBUG(scheduler,printTSO(dest));
2917 #endif
2918
2919   return dest;
2920 }
2921
2922 /* ---------------------------------------------------------------------------
2923    Wake up a queue that was blocked on some resource.
2924    ------------------------------------------------------------------------ */
2925
2926 #if defined(GRAN)
2927 STATIC_INLINE void
2928 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2929 {
2930 }
2931 #elif defined(PARALLEL_HASKELL)
2932 STATIC_INLINE void
2933 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2934 {
2935   /* write RESUME events to log file and
2936      update blocked and fetch time (depending on type of the orig closure) */
2937   if (RtsFlags.ParFlags.ParStats.Full) {
2938     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2939                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2940                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2941     if (emptyRunQueue())
2942       emitSchedule = rtsTrue;
2943
2944     switch (get_itbl(node)->type) {
2945         case FETCH_ME_BQ:
2946           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2947           break;
2948         case RBH:
2949         case FETCH_ME:
2950         case BLACKHOLE_BQ:
2951           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2952           break;
2953 #ifdef DIST
2954         case MVAR:
2955           break;
2956 #endif    
2957         default:
2958           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2959         }
2960       }
2961 }
2962 #endif
2963
2964 #if defined(GRAN)
2965 StgBlockingQueueElement *
2966 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2967 {
2968     StgTSO *tso;
2969     PEs node_loc, tso_loc;
2970
2971     node_loc = where_is(node); // should be lifted out of loop
2972     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2973     tso_loc = where_is((StgClosure *)tso);
2974     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2975       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2976       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2977       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2978       // insertThread(tso, node_loc);
2979       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2980                 ResumeThread,
2981                 tso, node, (rtsSpark*)NULL);
2982       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2983       // len_local++;
2984       // len++;
2985     } else { // TSO is remote (actually should be FMBQ)
2986       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2987                                   RtsFlags.GranFlags.Costs.gunblocktime +
2988                                   RtsFlags.GranFlags.Costs.latency;
2989       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2990                 UnblockThread,
2991                 tso, node, (rtsSpark*)NULL);
2992       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2993       // len++;
2994     }
2995     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2996     IF_GRAN_DEBUG(bq,
2997                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2998                           (node_loc==tso_loc ? "Local" : "Global"), 
2999                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3000     tso->block_info.closure = NULL;
3001     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3002                              tso->id, tso));
3003 }
3004 #elif defined(PARALLEL_HASKELL)
3005 StgBlockingQueueElement *
3006 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3007 {
3008     StgBlockingQueueElement *next;
3009
3010     switch (get_itbl(bqe)->type) {
3011     case TSO:
3012       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3013       /* if it's a TSO just push it onto the run_queue */
3014       next = bqe->link;
3015       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3016       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3017       threadRunnable();
3018       unblockCount(bqe, node);
3019       /* reset blocking status after dumping event */
3020       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3021       break;
3022
3023     case BLOCKED_FETCH:
3024       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3025       next = bqe->link;
3026       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3027       PendingFetches = (StgBlockedFetch *)bqe;
3028       break;
3029
3030 # if defined(DEBUG)
3031       /* can ignore this case in a non-debugging setup; 
3032          see comments on RBHSave closures above */
3033     case CONSTR:
3034       /* check that the closure is an RBHSave closure */
3035       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3036              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3037              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3038       break;
3039
3040     default:
3041       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3042            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3043            (StgClosure *)bqe);
3044 # endif
3045     }
3046   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3047   return next;
3048 }
3049 #endif
3050
3051 StgTSO *
3052 unblockOne(Capability *cap, StgTSO *tso)
3053 {
3054   StgTSO *next;
3055
3056   ASSERT(get_itbl(tso)->type == TSO);
3057   ASSERT(tso->why_blocked != NotBlocked);
3058   tso->why_blocked = NotBlocked;
3059   next = tso->link;
3060   tso->link = END_TSO_QUEUE;
3061
3062   // We might have just migrated this TSO to our Capability:
3063   if (tso->bound) {
3064       tso->bound->cap = cap;
3065   }
3066
3067   appendToRunQueue(cap,tso);
3068
3069   // we're holding a newly woken thread, make sure we context switch
3070   // quickly so we can migrate it if necessary.
3071   context_switch = 1;
3072   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3073   return next;
3074 }
3075
3076
3077 #if defined(GRAN)
3078 void 
3079 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3080 {
3081   StgBlockingQueueElement *bqe;
3082   PEs node_loc;
3083   nat len = 0; 
3084
3085   IF_GRAN_DEBUG(bq, 
3086                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3087                       node, CurrentProc, CurrentTime[CurrentProc], 
3088                       CurrentTSO->id, CurrentTSO));
3089
3090   node_loc = where_is(node);
3091
3092   ASSERT(q == END_BQ_QUEUE ||
3093          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3094          get_itbl(q)->type == CONSTR); // closure (type constructor)
3095   ASSERT(is_unique(node));
3096
3097   /* FAKE FETCH: magically copy the node to the tso's proc;
3098      no Fetch necessary because in reality the node should not have been 
3099      moved to the other PE in the first place
3100   */
3101   if (CurrentProc!=node_loc) {
3102     IF_GRAN_DEBUG(bq, 
3103                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3104                         node, node_loc, CurrentProc, CurrentTSO->id, 
3105                         // CurrentTSO, where_is(CurrentTSO),
3106                         node->header.gran.procs));
3107     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3108     IF_GRAN_DEBUG(bq, 
3109                   debugBelch("## new bitmask of node %p is %#x\n",
3110                         node, node->header.gran.procs));
3111     if (RtsFlags.GranFlags.GranSimStats.Global) {
3112       globalGranStats.tot_fake_fetches++;
3113     }
3114   }
3115
3116   bqe = q;
3117   // ToDo: check: ASSERT(CurrentProc==node_loc);
3118   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3119     //next = bqe->link;
3120     /* 
3121        bqe points to the current element in the queue
3122        next points to the next element in the queue
3123     */
3124     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3125     //tso_loc = where_is(tso);
3126     len++;
3127     bqe = unblockOne(bqe, node);
3128   }
3129
3130   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3131      the closure to make room for the anchor of the BQ */
3132   if (bqe!=END_BQ_QUEUE) {
3133     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3134     /*
3135     ASSERT((info_ptr==&RBH_Save_0_info) ||
3136            (info_ptr==&RBH_Save_1_info) ||
3137            (info_ptr==&RBH_Save_2_info));
3138     */
3139     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3140     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3141     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3142
3143     IF_GRAN_DEBUG(bq,
3144                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3145                         node, info_type(node)));
3146   }
3147
3148   /* statistics gathering */
3149   if (RtsFlags.GranFlags.GranSimStats.Global) {
3150     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3151     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3152     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3153     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3154   }
3155   IF_GRAN_DEBUG(bq,
3156                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3157                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3158 }
3159 #elif defined(PARALLEL_HASKELL)
3160 void 
3161 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3162 {
3163   StgBlockingQueueElement *bqe;
3164
3165   IF_PAR_DEBUG(verbose, 
3166                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3167                      node, mytid));
3168 #ifdef DIST  
3169   //RFP
3170   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3171     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3172     return;
3173   }
3174 #endif
3175   
3176   ASSERT(q == END_BQ_QUEUE ||
3177          get_itbl(q)->type == TSO ||           
3178          get_itbl(q)->type == BLOCKED_FETCH || 
3179          get_itbl(q)->type == CONSTR); 
3180
3181   bqe = q;
3182   while (get_itbl(bqe)->type==TSO || 
3183          get_itbl(bqe)->type==BLOCKED_FETCH) {
3184     bqe = unblockOne(bqe, node);
3185   }
3186 }
3187
3188 #else   /* !GRAN && !PARALLEL_HASKELL */
3189
3190 void
3191 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3192 {
3193     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3194                              // Exception.cmm
3195     while (tso != END_TSO_QUEUE) {
3196         tso = unblockOne(cap,tso);
3197     }
3198 }
3199 #endif
3200
3201 /* ---------------------------------------------------------------------------
3202    Interrupt execution
3203    - usually called inside a signal handler so it mustn't do anything fancy.   
3204    ------------------------------------------------------------------------ */
3205
3206 void
3207 interruptStgRts(void)
3208 {
3209     interrupted    = 1;
3210     context_switch = 1;
3211 #if defined(THREADED_RTS)
3212     prodAllCapabilities();
3213 #endif
3214 }
3215
3216 /* -----------------------------------------------------------------------------
3217    Unblock a thread
3218
3219    This is for use when we raise an exception in another thread, which
3220    may be blocked.
3221    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3222    -------------------------------------------------------------------------- */
3223
3224 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3225 /*
3226   NB: only the type of the blocking queue is different in GranSim and GUM
3227       the operations on the queue-elements are the same
3228       long live polymorphism!
3229
3230   Locks: sched_mutex is held upon entry and exit.
3231
3232 */
3233 static void
3234 unblockThread(Capability *cap, StgTSO *tso)
3235 {
3236   StgBlockingQueueElement *t, **last;
3237
3238   switch (tso->why_blocked) {
3239
3240   case NotBlocked:
3241     return;  /* not blocked */
3242
3243   case BlockedOnSTM:
3244     // Be careful: nothing to do here!  We tell the scheduler that the thread
3245     // is runnable and we leave it to the stack-walking code to abort the 
3246     // transaction while unwinding the stack.  We should perhaps have a debugging
3247     // test to make sure that this really happens and that the 'zombie' transaction
3248     // does not get committed.
3249     goto done;
3250
3251   case BlockedOnMVar:
3252     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3253     {
3254       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3255       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3256
3257       last = (StgBlockingQueueElement **)&mvar->head;
3258       for (t = (StgBlockingQueueElement *)mvar->head; 
3259            t != END_BQ_QUEUE; 
3260            last = &t->link, last_tso = t, t = t->link) {
3261         if (t == (StgBlockingQueueElement *)tso) {
3262           *last = (StgBlockingQueueElement *)tso->link;
3263           if (mvar->tail == tso) {
3264             mvar->tail = (StgTSO *)last_tso;
3265           }
3266           goto done;
3267         }
3268       }
3269       barf("unblockThread (MVAR): TSO not found");
3270     }
3271
3272   case BlockedOnBlackHole:
3273     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3274     {
3275       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3276
3277       last = &bq->blocking_queue;
3278       for (t = bq->blocking_queue; 
3279            t != END_BQ_QUEUE; 
3280            last = &t->link, t = t->link) {
3281         if (t == (StgBlockingQueueElement *)tso) {
3282           *last = (StgBlockingQueueElement *)tso->link;
3283           goto done;
3284         }
3285       }
3286       barf("unblockThread (BLACKHOLE): TSO not found");
3287     }
3288
3289   case BlockedOnException:
3290     {
3291       StgTSO *target  = tso->block_info.tso;
3292
3293       ASSERT(get_itbl(target)->type == TSO);
3294
3295       if (target->what_next == ThreadRelocated) {
3296           target = target->link;
3297           ASSERT(get_itbl(target)->type == TSO);
3298       }
3299
3300       ASSERT(target->blocked_exceptions != NULL);
3301
3302       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3303       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3304            t != END_BQ_QUEUE; 
3305            last = &t->link, t = t->link) {
3306         ASSERT(get_itbl(t)->type == TSO);
3307         if (t == (StgBlockingQueueElement *)tso) {
3308           *last = (StgBlockingQueueElement *)tso->link;
3309           goto done;
3310         }
3311       }
3312       barf("unblockThread (Exception): TSO not found");
3313     }
3314
3315   case BlockedOnRead:
3316   case BlockedOnWrite:
3317 #if defined(mingw32_HOST_OS)
3318   case BlockedOnDoProc:
3319 #endif
3320     {
3321       /* take TSO off blocked_queue */
3322       StgBlockingQueueElement *prev = NULL;
3323       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3324            prev = t, t = t->link) {
3325         if (t == (StgBlockingQueueElement *)tso) {
3326           if (prev == NULL) {
3327             blocked_queue_hd = (StgTSO *)t->link;
3328             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3329               blocked_queue_tl = END_TSO_QUEUE;
3330             }
3331           } else {
3332             prev->link = t->link;
3333             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3334               blocked_queue_tl = (StgTSO *)prev;
3335             }
3336           }
3337 #if defined(mingw32_HOST_OS)
3338           /* (Cooperatively) signal that the worker thread should abort
3339            * the request.
3340            */
3341           abandonWorkRequest(tso->block_info.async_result->reqID);
3342 #endif
3343           goto done;
3344         }
3345       }
3346       barf("unblockThread (I/O): TSO not found");
3347     }
3348
3349   case BlockedOnDelay:
3350     {
3351       /* take TSO off sleeping_queue */
3352       StgBlockingQueueElement *prev = NULL;
3353       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3354            prev = t, t = t->link) {
3355         if (t == (StgBlockingQueueElement *)tso) {
3356           if (prev == NULL) {
3357             sleeping_queue = (StgTSO *)t->link;
3358           } else {
3359             prev->link = t->link;
3360           }
3361           goto done;
3362         }
3363       }
3364       barf("unblockThread (delay): TSO not found");
3365     }
3366
3367   default:
3368     barf("unblockThread");
3369   }
3370
3371  done:
3372   tso->link = END_TSO_QUEUE;
3373   tso->why_blocked = NotBlocked;
3374   tso->block_info.closure = NULL;
3375   pushOnRunQueue(cap,tso);
3376 }
3377 #else
3378 static void
3379 unblockThread(Capability *cap, StgTSO *tso)
3380 {
3381   StgTSO *t, **last;
3382   
3383   /* To avoid locking unnecessarily. */
3384   if (tso->why_blocked == NotBlocked) {
3385     return;
3386   }
3387
3388   switch (tso->why_blocked) {
3389
3390   case BlockedOnSTM:
3391     // Be careful: nothing to do here!  We tell the scheduler that the thread
3392     // is runnable and we leave it to the stack-walking code to abort the 
3393     // transaction while unwinding the stack.  We should perhaps have a debugging
3394     // test to make sure that this really happens and that the 'zombie' transaction
3395     // does not get committed.
3396     goto done;
3397
3398   case BlockedOnMVar:
3399     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3400     {
3401       StgTSO *last_tso = END_TSO_QUEUE;
3402       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3403
3404       last = &mvar->head;
3405       for (t = mvar->head; t != END_TSO_QUEUE; 
3406            last = &t->link, last_tso = t, t = t->link) {
3407         if (t == tso) {
3408           *last = tso->link;
3409           if (mvar->tail == tso) {
3410             mvar->tail = last_tso;
3411           }
3412           goto done;
3413         }
3414       }
3415       barf("unblockThread (MVAR): TSO not found");
3416     }
3417
3418   case BlockedOnBlackHole:
3419     {
3420       last = &blackhole_queue;
3421       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3422            last = &t->link, t = t->link) {
3423         if (t == tso) {
3424           *last = tso->link;
3425           goto done;
3426         }
3427       }
3428       barf("unblockThread (BLACKHOLE): TSO not found");
3429     }
3430
3431   case BlockedOnException:
3432     {
3433       StgTSO *target  = tso->block_info.tso;
3434
3435       ASSERT(get_itbl(target)->type == TSO);
3436
3437       while (target->what_next == ThreadRelocated) {
3438           target = target->link;
3439           ASSERT(get_itbl(target)->type == TSO);
3440       }
3441       
3442       ASSERT(target->blocked_exceptions != NULL);
3443
3444       last = &target->blocked_exceptions;
3445       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3446            last = &t->link, t = t->link) {
3447         ASSERT(get_itbl(t)->type == TSO);
3448         if (t == tso) {
3449           *last = tso->link;
3450           goto done;
3451         }
3452       }
3453       barf("unblockThread (Exception): TSO not found");
3454     }
3455
3456 #if !defined(THREADED_RTS)
3457   case BlockedOnRead:
3458   case BlockedOnWrite:
3459 #if defined(mingw32_HOST_OS)
3460   case BlockedOnDoProc:
3461 #endif
3462     {
3463       StgTSO *prev = NULL;
3464       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3465            prev = t, t = t->link) {
3466         if (t == tso) {
3467           if (prev == NULL) {
3468             blocked_queue_hd = t->link;
3469             if (blocked_queue_tl == t) {
3470               blocked_queue_tl = END_TSO_QUEUE;
3471             }
3472           } else {
3473             prev->link = t->link;
3474             if (blocked_queue_tl == t) {
3475               blocked_queue_tl = prev;
3476             }
3477           }
3478 #if defined(mingw32_HOST_OS)
3479           /* (Cooperatively) signal that the worker thread should abort
3480            * the request.
3481            */
3482           abandonWorkRequest(tso->block_info.async_result->reqID);
3483 #endif
3484           goto done;
3485         }
3486       }
3487       barf("unblockThread (I/O): TSO not found");
3488     }
3489
3490   case BlockedOnDelay:
3491     {
3492       StgTSO *prev = NULL;
3493       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3494            prev = t, t = t->link) {
3495         if (t == tso) {
3496           if (prev == NULL) {
3497             sleeping_queue = t->link;
3498           } else {
3499             prev->link = t->link;
3500           }
3501           goto done;
3502         }
3503       }
3504       barf("unblockThread (delay): TSO not found");
3505     }
3506 #endif
3507
3508   default:
3509     barf("unblockThread");
3510   }
3511
3512  done:
3513   tso->link = END_TSO_QUEUE;
3514   tso->why_blocked = NotBlocked;
3515   tso->block_info.closure = NULL;
3516   appendToRunQueue(cap,tso);
3517 }
3518 #endif
3519
3520 /* -----------------------------------------------------------------------------
3521  * checkBlackHoles()
3522  *
3523  * Check the blackhole_queue for threads that can be woken up.  We do
3524  * this periodically: before every GC, and whenever the run queue is
3525  * empty.
3526  *
3527  * An elegant solution might be to just wake up all the blocked
3528  * threads with awakenBlockedQueue occasionally: they'll go back to
3529  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3530  * doesn't give us a way to tell whether we've actually managed to
3531  * wake up any threads, so we would be busy-waiting.
3532  *
3533  * -------------------------------------------------------------------------- */
3534
3535 static rtsBool
3536 checkBlackHoles (Capability *cap)
3537 {
3538     StgTSO **prev, *t;
3539     rtsBool any_woke_up = rtsFalse;
3540     StgHalfWord type;
3541
3542     // blackhole_queue is global:
3543     ASSERT_LOCK_HELD(&sched_mutex);
3544
3545     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3546
3547     // ASSUMES: sched_mutex
3548     prev = &blackhole_queue;
3549     t = blackhole_queue;
3550     while (t != END_TSO_QUEUE) {
3551         ASSERT(t->why_blocked == BlockedOnBlackHole);
3552         type = get_itbl(t->block_info.closure)->type;
3553         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3554             IF_DEBUG(sanity,checkTSO(t));
3555             t = unblockOne(cap, t);
3556             // urk, the threads migrate to the current capability
3557             // here, but we'd like to keep them on the original one.
3558             *prev = t;
3559             any_woke_up = rtsTrue;
3560         } else {
3561             prev = &t->link;
3562             t = t->link;
3563         }
3564     }
3565
3566     return any_woke_up;
3567 }
3568
3569 /* -----------------------------------------------------------------------------
3570  * raiseAsync()
3571  *
3572  * The following function implements the magic for raising an
3573  * asynchronous exception in an existing thread.
3574  *
3575  * We first remove the thread from any queue on which it might be
3576  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3577  *
3578  * We strip the stack down to the innermost CATCH_FRAME, building
3579  * thunks in the heap for all the active computations, so they can 
3580  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3581  * an application of the handler to the exception, and push it on
3582  * the top of the stack.
3583  * 
3584  * How exactly do we save all the active computations?  We create an
3585  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3586  * AP_STACKs pushes everything from the corresponding update frame
3587  * upwards onto the stack.  (Actually, it pushes everything up to the
3588  * next update frame plus a pointer to the next AP_STACK object.
3589  * Entering the next AP_STACK object pushes more onto the stack until we
3590  * reach the last AP_STACK object - at which point the stack should look
3591  * exactly as it did when we killed the TSO and we can continue
3592  * execution by entering the closure on top of the stack.
3593  *
3594  * We can also kill a thread entirely - this happens if either (a) the 
3595  * exception passed to raiseAsync is NULL, or (b) there's no
3596  * CATCH_FRAME on the stack.  In either case, we strip the entire
3597  * stack and replace the thread with a zombie.
3598  *
3599  * ToDo: in SMP mode, this function is only safe if either (a) we hold
3600  * all the Capabilities (eg. in GC), or (b) we own the Capability that
3601  * the TSO is currently blocked on or on the run queue of.
3602  *
3603  * -------------------------------------------------------------------------- */
3604  
3605 void
3606 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3607 {
3608     raiseAsync_(cap, tso, exception, rtsFalse);
3609 }
3610
3611 static void
3612 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3613             rtsBool stop_at_atomically)
3614 {
3615     StgRetInfoTable *info;
3616     StgPtr sp;
3617   
3618     // Thread already dead?
3619     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3620         return;
3621     }
3622
3623     IF_DEBUG(scheduler, 
3624              sched_belch("raising exception in thread %ld.", (long)tso->id));
3625     
3626     // Remove it from any blocking queues
3627     unblockThread(cap,tso);
3628
3629     sp = tso->sp;
3630     
3631     // The stack freezing code assumes there's a closure pointer on
3632     // the top of the stack, so we have to arrange that this is the case...
3633     //
3634     if (sp[0] == (W_)&stg_enter_info) {
3635         sp++;
3636     } else {
3637         sp--;
3638         sp[0] = (W_)&stg_dummy_ret_closure;
3639     }
3640
3641     while (1) {
3642         nat i;
3643
3644         // 1. Let the top of the stack be the "current closure"
3645         //
3646         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3647         // CATCH_FRAME.
3648         //
3649         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3650         // current closure applied to the chunk of stack up to (but not
3651         // including) the update frame.  This closure becomes the "current
3652         // closure".  Go back to step 2.
3653         //
3654         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3655         // top of the stack applied to the exception.
3656         // 
3657         // 5. If it's a STOP_FRAME, then kill the thread.
3658         // 
3659         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3660         // transaction
3661        
3662         
3663         StgPtr frame;
3664         
3665         frame = sp + 1;
3666         info = get_ret_itbl((StgClosure *)frame);
3667         
3668         while (info->i.type != UPDATE_FRAME
3669                && (info->i.type != CATCH_FRAME || exception == NULL)
3670                && info->i.type != STOP_FRAME
3671                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3672         {
3673             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3674               // IF we find an ATOMICALLY_FRAME then we abort the
3675               // current transaction and propagate the exception.  In
3676               // this case (unlike ordinary exceptions) we do not care
3677               // whether the transaction is valid or not because its
3678               // possible validity cannot have caused the exception
3679               // and will not be visible after the abort.
3680               IF_DEBUG(stm,
3681                        debugBelch("Found atomically block delivering async exception\n"));
3682               stmAbortTransaction(tso -> trec);
3683               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3684             }
3685             frame += stack_frame_sizeW((StgClosure *)frame);
3686             info = get_ret_itbl((StgClosure *)frame);
3687         }
3688         
3689         switch (info->i.type) {
3690             
3691         case ATOMICALLY_FRAME:
3692             ASSERT(stop_at_atomically);
3693             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3694             stmCondemnTransaction(tso -> trec);
3695 #ifdef REG_R1
3696             tso->sp = frame;
3697 #else
3698             // R1 is not a register: the return convention for IO in
3699             // this case puts the return value on the stack, so we
3700             // need to set up the stack to return to the atomically
3701             // frame properly...
3702             tso->sp = frame - 2;
3703             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3704             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3705 #endif
3706             tso->what_next = ThreadRunGHC;
3707             return;
3708
3709         case CATCH_FRAME:
3710             // If we find a CATCH_FRAME, and we've got an exception to raise,
3711             // then build the THUNK raise(exception), and leave it on
3712             // top of the CATCH_FRAME ready to enter.
3713             //
3714         {
3715 #ifdef PROFILING
3716             StgCatchFrame *cf = (StgCatchFrame *)frame;
3717 #endif
3718             StgThunk *raise;
3719             
3720             // we've got an exception to raise, so let's pass it to the
3721             // handler in this frame.
3722             //
3723             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3724             TICK_ALLOC_SE_THK(1,0);
3725             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3726             raise->payload[0] = exception;
3727             
3728             // throw away the stack from Sp up to the CATCH_FRAME.
3729             //
3730             sp = frame - 1;
3731             
3732             /* Ensure that async excpetions are blocked now, so we don't get
3733              * a surprise exception before we get around to executing the
3734              * handler.
3735              */
3736             if (tso->blocked_exceptions == NULL) {
3737                 tso->blocked_exceptions = END_TSO_QUEUE;
3738             }
3739             
3740             /* Put the newly-built THUNK on top of the stack, ready to execute
3741              * when the thread restarts.
3742              */
3743             sp[0] = (W_)raise;
3744             sp[-1] = (W_)&stg_enter_info;
3745             tso->sp = sp-1;
3746             tso->what_next = ThreadRunGHC;
3747             IF_DEBUG(sanity, checkTSO(tso));
3748             return;
3749         }
3750         
3751         case UPDATE_FRAME:
3752         {
3753             StgAP_STACK * ap;
3754             nat words;
3755             
3756             // First build an AP_STACK consisting of the stack chunk above the
3757             // current update frame, with the top word on the stack as the
3758             // fun field.
3759             //
3760             words = frame - sp - 1;
3761             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3762             
3763             ap->size = words;
3764             ap->fun  = (StgClosure *)sp[0];
3765             sp++;
3766             for(i=0; i < (nat)words; ++i) {
3767                 ap->payload[i] = (StgClosure *)*sp++;
3768             }
3769             
3770             SET_HDR(ap,&stg_AP_STACK_info,
3771                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3772             TICK_ALLOC_UP_THK(words+1,0);
3773             
3774             IF_DEBUG(scheduler,
3775                      debugBelch("sched: Updating ");
3776                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3777                      debugBelch(" with ");
3778                      printObj((StgClosure *)ap);
3779                 );
3780
3781             // Replace the updatee with an indirection - happily
3782             // this will also wake up any threads currently
3783             // waiting on the result.
3784             //
3785             // Warning: if we're in a loop, more than one update frame on
3786             // the stack may point to the same object.  Be careful not to
3787             // overwrite an IND_OLDGEN in this case, because we'll screw
3788             // up the mutable lists.  To be on the safe side, don't
3789             // overwrite any kind of indirection at all.  See also
3790             // threadSqueezeStack in GC.c, where we have to make a similar
3791             // check.
3792             //
3793             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3794                 // revert the black hole
3795                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3796                                (StgClosure *)ap);
3797             }
3798             sp += sizeofW(StgUpdateFrame) - 1;
3799             sp[0] = (W_)ap; // push onto stack
3800             break;
3801         }
3802         
3803         case STOP_FRAME:
3804             // We've stripped the entire stack, the thread is now dead.
3805             tso->what_next = ThreadKilled;
3806             tso->sp = frame + sizeofW(StgStopFrame);
3807             return;
3808             
3809         default:
3810             barf("raiseAsync");
3811         }
3812     }
3813     barf("raiseAsync");
3814 }
3815
3816 /* -----------------------------------------------------------------------------
3817    Deleting threads
3818
3819    This is used for interruption (^C) and forking, and corresponds to
3820    raising an exception but without letting the thread catch the
3821    exception.
3822    -------------------------------------------------------------------------- */
3823
3824 static void 
3825 deleteThread (Capability *cap, StgTSO *tso)
3826 {
3827   if (tso->why_blocked != BlockedOnCCall &&
3828       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3829       raiseAsync(cap,tso,NULL);
3830   }
3831 }
3832
3833 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3834 static void 
3835 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3836 { // for forkProcess only:
3837   // delete thread without giving it a chance to catch the KillThread exception
3838
3839   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3840       return;
3841   }
3842
3843   if (tso->why_blocked != BlockedOnCCall &&
3844       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3845       unblockThread(cap,tso);
3846   }
3847
3848   tso->what_next = ThreadKilled;
3849 }
3850 #endif
3851
3852 /* -----------------------------------------------------------------------------
3853    raiseExceptionHelper
3854    
3855    This function is called by the raise# primitve, just so that we can
3856    move some of the tricky bits of raising an exception from C-- into
3857    C.  Who knows, it might be a useful re-useable thing here too.
3858    -------------------------------------------------------------------------- */
3859
3860 StgWord
3861 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3862 {
3863     Capability *cap = regTableToCapability(reg);
3864     StgThunk *raise_closure = NULL;
3865     StgPtr p, next;
3866     StgRetInfoTable *info;
3867     //
3868     // This closure represents the expression 'raise# E' where E
3869     // is the exception raise.  It is used to overwrite all the
3870     // thunks which are currently under evaluataion.
3871     //
3872
3873     //    
3874     // LDV profiling: stg_raise_info has THUNK as its closure
3875     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3876     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3877     // 1 does not cause any problem unless profiling is performed.
3878     // However, when LDV profiling goes on, we need to linearly scan
3879     // small object pool, where raise_closure is stored, so we should
3880     // use MIN_UPD_SIZE.
3881     //
3882     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3883     //                                 sizeofW(StgClosure)+1);
3884     //
3885
3886     //
3887     // Walk up the stack, looking for the catch frame.  On the way,
3888     // we update any closures pointed to from update frames with the
3889     // raise closure that we just built.
3890     //
3891     p = tso->sp;
3892     while(1) {
3893         info = get_ret_itbl((StgClosure *)p);
3894         next = p + stack_frame_sizeW((StgClosure *)p);
3895         switch (info->i.type) {
3896             
3897         case UPDATE_FRAME:
3898             // Only create raise_closure if we need to.
3899             if (raise_closure == NULL) {
3900                 raise_closure = 
3901                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3902                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3903                 raise_closure->payload[0] = exception;
3904             }
3905             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3906             p = next;
3907             continue;
3908
3909         case ATOMICALLY_FRAME:
3910             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3911             tso->sp = p;
3912             return ATOMICALLY_FRAME;
3913             
3914         case CATCH_FRAME:
3915             tso->sp = p;
3916             return CATCH_FRAME;
3917
3918         case CATCH_STM_FRAME:
3919             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3920             tso->sp = p;
3921             return CATCH_STM_FRAME;
3922             
3923         case STOP_FRAME:
3924             tso->sp = p;
3925             return STOP_FRAME;
3926
3927         case CATCH_RETRY_FRAME:
3928         default:
3929             p = next; 
3930             continue;
3931         }
3932     }
3933 }
3934
3935
3936 /* -----------------------------------------------------------------------------
3937    findRetryFrameHelper
3938
3939    This function is called by the retry# primitive.  It traverses the stack
3940    leaving tso->sp referring to the frame which should handle the retry.  
3941
3942    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3943    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3944
3945    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3946    despite the similar implementation.
3947
3948    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3949    not be created within memory transactions.
3950    -------------------------------------------------------------------------- */
3951
3952 StgWord
3953 findRetryFrameHelper (StgTSO *tso)
3954 {
3955   StgPtr           p, next;
3956   StgRetInfoTable *info;
3957
3958   p = tso -> sp;
3959   while (1) {
3960     info = get_ret_itbl((StgClosure *)p);
3961     next = p + stack_frame_sizeW((StgClosure *)p);
3962     switch (info->i.type) {
3963       
3964     case ATOMICALLY_FRAME:
3965       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3966       tso->sp = p;
3967       return ATOMICALLY_FRAME;
3968       
3969     case CATCH_RETRY_FRAME:
3970       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3971       tso->sp = p;
3972       return CATCH_RETRY_FRAME;
3973       
3974     case CATCH_STM_FRAME:
3975     default:
3976       ASSERT(info->i.type != CATCH_FRAME);
3977       ASSERT(info->i.type != STOP_FRAME);
3978       p = next; 
3979       continue;
3980     }
3981   }
3982 }
3983
3984 /* -----------------------------------------------------------------------------
3985    resurrectThreads is called after garbage collection on the list of
3986    threads found to be garbage.  Each of these threads will be woken
3987    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3988    on an MVar, or NonTermination if the thread was blocked on a Black
3989    Hole.
3990
3991    Locks: assumes we hold *all* the capabilities.
3992    -------------------------------------------------------------------------- */
3993
3994 void
3995 resurrectThreads (StgTSO *threads)
3996 {
3997     StgTSO *tso, *next;
3998     Capability *cap;
3999
4000     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4001         next = tso->global_link;
4002         tso->global_link = all_threads;
4003         all_threads = tso;
4004         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4005         
4006         // Wake up the thread on the Capability it was last on for a
4007         // bound thread, or last_free_capability otherwise.
4008         if (tso->bound) {
4009             cap = tso->bound->cap;
4010         } else {
4011             cap = last_free_capability;
4012         }
4013         
4014         switch (tso->why_blocked) {
4015         case BlockedOnMVar:
4016         case BlockedOnException:
4017             /* Called by GC - sched_mutex lock is currently held. */
4018             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4019             break;
4020         case BlockedOnBlackHole:
4021             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4022             break;
4023         case BlockedOnSTM:
4024             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4025             break;
4026         case NotBlocked:
4027             /* This might happen if the thread was blocked on a black hole
4028              * belonging to a thread that we've just woken up (raiseAsync
4029              * can wake up threads, remember...).
4030              */
4031             continue;
4032         default:
4033             barf("resurrectThreads: thread blocked in a strange way");
4034         }
4035     }
4036 }
4037
4038 /* ----------------------------------------------------------------------------
4039  * Debugging: why is a thread blocked
4040  * [Also provides useful information when debugging threaded programs
4041  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4042    ------------------------------------------------------------------------- */
4043
4044 #if DEBUG
4045 static void
4046 printThreadBlockage(StgTSO *tso)
4047 {
4048   switch (tso->why_blocked) {
4049   case BlockedOnRead:
4050     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4051     break;
4052   case BlockedOnWrite:
4053     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4054     break;
4055 #if defined(mingw32_HOST_OS)
4056     case BlockedOnDoProc:
4057     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4058     break;
4059 #endif
4060   case BlockedOnDelay:
4061     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4062     break;
4063   case BlockedOnMVar:
4064     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4065     break;
4066   case BlockedOnException:
4067     debugBelch("is blocked on delivering an exception to thread %d",
4068             tso->block_info.tso->id);
4069     break;
4070   case BlockedOnBlackHole:
4071     debugBelch("is blocked on a black hole");
4072     break;
4073   case NotBlocked:
4074     debugBelch("is not blocked");
4075     break;
4076 #if defined(PARALLEL_HASKELL)
4077   case BlockedOnGA:
4078     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4079             tso->block_info.closure, info_type(tso->block_info.closure));
4080     break;
4081   case BlockedOnGA_NoSend:
4082     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4083             tso->block_info.closure, info_type(tso->block_info.closure));
4084     break;
4085 #endif
4086   case BlockedOnCCall:
4087     debugBelch("is blocked on an external call");
4088     break;
4089   case BlockedOnCCall_NoUnblockExc:
4090     debugBelch("is blocked on an external call (exceptions were already blocked)");
4091     break;
4092   case BlockedOnSTM:
4093     debugBelch("is blocked on an STM operation");
4094     break;
4095   default:
4096     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4097          tso->why_blocked, tso->id, tso);
4098   }
4099 }
4100
4101 void
4102 printThreadStatus(StgTSO *t)
4103 {
4104     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4105     {
4106       void *label = lookupThreadLabel(t->id);
4107       if (label) debugBelch("[\"%s\"] ",(char *)label);
4108     }
4109     if (t->what_next == ThreadRelocated) {
4110         debugBelch("has been relocated...\n");
4111     } else {
4112         switch (t->what_next) {
4113         case ThreadKilled:
4114             debugBelch("has been killed");
4115             break;
4116         case ThreadComplete:
4117             debugBelch("has completed");
4118             break;
4119         default:
4120             printThreadBlockage(t);
4121         }
4122         debugBelch("\n");
4123     }
4124 }
4125
4126 void
4127 printAllThreads(void)
4128 {
4129   StgTSO *t, *next;
4130   nat i;
4131   Capability *cap;
4132
4133 # if defined(GRAN)
4134   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4135   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4136                        time_string, rtsFalse/*no commas!*/);
4137
4138   debugBelch("all threads at [%s]:\n", time_string);
4139 # elif defined(PARALLEL_HASKELL)
4140   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4141   ullong_format_string(CURRENT_TIME,
4142                        time_string, rtsFalse/*no commas!*/);
4143
4144   debugBelch("all threads at [%s]:\n", time_string);
4145 # else
4146   debugBelch("all threads:\n");
4147 # endif
4148
4149   for (i = 0; i < n_capabilities; i++) {
4150       cap = &capabilities[i];
4151       debugBelch("threads on capability %d:\n", cap->no);
4152       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4153           printThreadStatus(t);
4154       }
4155   }
4156
4157   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4158       if (t->why_blocked != NotBlocked) {
4159           printThreadStatus(t);
4160       }
4161       if (t->what_next == ThreadRelocated) {
4162           next = t->link;
4163       } else {
4164           next = t->global_link;
4165       }
4166   }
4167 }
4168
4169 // useful from gdb
4170 void 
4171 printThreadQueue(StgTSO *t)
4172 {
4173     nat i = 0;
4174     for (; t != END_TSO_QUEUE; t = t->link) {
4175         printThreadStatus(t);
4176         i++;
4177     }
4178     debugBelch("%d threads on queue\n", i);
4179 }
4180
4181 /* 
4182    Print a whole blocking queue attached to node (debugging only).
4183 */
4184 # if defined(PARALLEL_HASKELL)
4185 void 
4186 print_bq (StgClosure *node)
4187 {
4188   StgBlockingQueueElement *bqe;
4189   StgTSO *tso;
4190   rtsBool end;
4191
4192   debugBelch("## BQ of closure %p (%s): ",
4193           node, info_type(node));
4194
4195   /* should cover all closures that may have a blocking queue */
4196   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4197          get_itbl(node)->type == FETCH_ME_BQ ||
4198          get_itbl(node)->type == RBH ||
4199          get_itbl(node)->type == MVAR);
4200     
4201   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4202
4203   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4204 }
4205
4206 /* 
4207    Print a whole blocking queue starting with the element bqe.
4208 */
4209 void 
4210 print_bqe (StgBlockingQueueElement *bqe)
4211 {
4212   rtsBool end;
4213
4214   /* 
4215      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4216   */
4217   for (end = (bqe==END_BQ_QUEUE);
4218        !end; // iterate until bqe points to a CONSTR
4219        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4220        bqe = end ? END_BQ_QUEUE : bqe->link) {
4221     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4222     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4223     /* types of closures that may appear in a blocking queue */
4224     ASSERT(get_itbl(bqe)->type == TSO ||           
4225            get_itbl(bqe)->type == BLOCKED_FETCH || 
4226            get_itbl(bqe)->type == CONSTR); 
4227     /* only BQs of an RBH end with an RBH_Save closure */
4228     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4229
4230     switch (get_itbl(bqe)->type) {
4231     case TSO:
4232       debugBelch(" TSO %u (%x),",
4233               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4234       break;
4235     case BLOCKED_FETCH:
4236       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4237               ((StgBlockedFetch *)bqe)->node, 
4238               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4239               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4240               ((StgBlockedFetch *)bqe)->ga.weight);
4241       break;
4242     case CONSTR:
4243       debugBelch(" %s (IP %p),",
4244               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4245                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4246                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4247                "RBH_Save_?"), get_itbl(bqe));
4248       break;
4249     default:
4250       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4251            info_type((StgClosure *)bqe)); // , node, info_type(node));
4252       break;
4253     }
4254   } /* for */
4255   debugBelch("\n");
4256 }
4257 # elif defined(GRAN)
4258 void 
4259 print_bq (StgClosure *node)
4260 {
4261   StgBlockingQueueElement *bqe;
4262   PEs node_loc, tso_loc;
4263   rtsBool end;
4264
4265   /* should cover all closures that may have a blocking queue */
4266   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4267          get_itbl(node)->type == FETCH_ME_BQ ||
4268          get_itbl(node)->type == RBH);
4269     
4270   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4271   node_loc = where_is(node);
4272
4273   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4274           node, info_type(node), node_loc);
4275
4276   /* 
4277      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4278   */
4279   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4280        !end; // iterate until bqe points to a CONSTR
4281        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4282     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4283     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4284     /* types of closures that may appear in a blocking queue */
4285     ASSERT(get_itbl(bqe)->type == TSO ||           
4286            get_itbl(bqe)->type == CONSTR); 
4287     /* only BQs of an RBH end with an RBH_Save closure */
4288     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4289
4290     tso_loc = where_is((StgClosure *)bqe);
4291     switch (get_itbl(bqe)->type) {
4292     case TSO:
4293       debugBelch(" TSO %d (%p) on [PE %d],",
4294               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4295       break;
4296     case CONSTR:
4297       debugBelch(" %s (IP %p),",
4298               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4299                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4300                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4301                "RBH_Save_?"), get_itbl(bqe));
4302       break;
4303     default:
4304       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4305            info_type((StgClosure *)bqe), node, info_type(node));
4306       break;
4307     }
4308   } /* for */
4309   debugBelch("\n");
4310 }
4311 # endif
4312
4313 #if defined(PARALLEL_HASKELL)
4314 static nat
4315 run_queue_len(void)
4316 {
4317     nat i;
4318     StgTSO *tso;
4319     
4320     for (i=0, tso=run_queue_hd; 
4321          tso != END_TSO_QUEUE;
4322          i++, tso=tso->link) {
4323         /* nothing */
4324     }
4325         
4326     return i;
4327 }
4328 #endif
4329
4330 void
4331 sched_belch(char *s, ...)
4332 {
4333     va_list ap;
4334     va_start(ap,s);
4335 #ifdef THREADED_RTS
4336     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4337 #elif defined(PARALLEL_HASKELL)
4338     debugBelch("== ");
4339 #else
4340     debugBelch("sched: ");
4341 #endif
4342     vdebugBelch(s, ap);
4343     debugBelch("\n");
4344     va_end(ap);
4345 }
4346
4347 #endif /* DEBUG */